Simplified directory.

This commit is contained in:
Paul Masurel
2016-01-17 23:34:51 +09:00
parent 726d38d26d
commit c054b7410a
7 changed files with 351 additions and 219 deletions

View File

@@ -4,27 +4,35 @@ use std::io::Write;
use fst::MapBuilder;
use core::error::*;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
pub trait SegmentOutput<'a, W: Write> {
fn terms(&self,) -> W;
fn postings(&self,) -> W;
// TODO positions, docvalues, ...
}
use core::directory::Segment;
use core::directory::SegmentComponent;
pub trait Codec {
fn write<'a, I: SerializableSegment<'a>, W: Write>(index: &'a I, output: &'a SegmentOutput<'a, W>) -> Result<usize>;
fn write<'a, I: SerializableSegment<'a>>(index: &'a I, segment: &'a Segment) -> Result<usize>;
}
pub struct SimpleCodec;
impl SimpleCodec {
fn write_postings<D: DocCursor, W: Write>(mut doc_it: D, postings: &mut W) -> Result<usize> {
fn write_postings<D: DocCursor, W: Write>(doc_it: D, postings: &mut W) -> Result<usize> {
let mut written_bytes: usize = 4;
postings.write_u32::<LittleEndian>(doc_it.len() as u32);
// TODO handle error correctly
match postings.write_u32::<LittleEndian>(doc_it.len() as u32) {
Ok(_) => {},
Err(_) => {
let msg = String::from("Failed writing posting list length");
return Err(Error::WriteError(msg));
},
}
for doc_id in doc_it {
postings.write_u32::<LittleEndian>(doc_id as u32);
println!("doc {}", doc_id);
match postings.write_u32::<LittleEndian>(doc_id as u32) {
Ok(_) => {},
Err(_) => {
let msg = String::from("Failed while writing posting list");
return Err(Error::WriteError(msg));
},
}
written_bytes += 4;
}
Ok(written_bytes)
@@ -32,17 +40,18 @@ impl SimpleCodec {
}
impl Codec for SimpleCodec {
fn write<'a, I: SerializableSegment<'a>, W: Write>(index: &'a I, output: &'a SegmentOutput<'a, W>) -> Result<usize> {
let term_trie_builder_result = MapBuilder::new(output.terms());
fn write<'a, I: SerializableSegment<'a>>(index: &'a I, segment: &'a Segment) -> Result<usize> {
let mut term_write = try!(segment.open_writable(SegmentComponent::TERMS));
let mut postings_write = try!(segment.open_writable(SegmentComponent::POSTINGS));
let term_trie_builder_result = MapBuilder::new(term_write);
if term_trie_builder_result.is_err() {
// TODO include cause somehow
return Err(Error::IOError(String::from("Failed creating the term builder")));
return Err(Error::WriteError(String::from("Failed creating the term builder")));
}
let mut term_buffer: String = String::new();
let mut term_trie_builder = term_trie_builder_result.unwrap();
let mut term_cursor = index.term_cursor();
let mut offset: usize = 0;
let mut postings_output = output.postings();
loop {
match term_cursor.next() {
Some((term, doc_it)) => {
@@ -50,10 +59,10 @@ impl Codec for SimpleCodec {
match term_trie_builder.insert(&term_buffer, offset as u64) {
Ok(_) => {}
Err(_) => {
return Err(Error::IOError(String::from("Failed while inserting into the fst")))
return Err(Error::WriteError(String::from("Failed while inserting into the fst")))
},
}
offset += try!(SimpleCodec::write_postings(doc_it, &mut postings_output));
offset += try!(SimpleCodec::write_postings(doc_it, &mut postings_write));
},
None => {
break;
@@ -64,49 +73,3 @@ impl Codec for SimpleCodec {
}
}
// impl DebugCodec {
// fn write_field(field_name) {
//
// }
//
// fn serialize_postings_for_term() -> Result<usize, io::Error> {
//
// }
// }
//
// impl Codec for DebugCodec {
// fn write<'a, I: SerializableSegment<'a>>(index: &I, output: &SegmentOutput) -> Result<usize, io::Error> {
// let mut field_cursor = index.field_cursor();
// let mut posting_offset: usize = 0;
// loop {
// match field_cursor.next() {
// Some(field) => {
// let field_name = field_cursor.get_field();
// try!(DebugCodec::write_term(field_name, posting_offset, output.terms));
// let term_cursor = field_cursor.term_cursor();
// let len = try!(DebugCodec::serialize_postings_for_term(term_cursor));
// posting_offset += len;
// },
// None => { break; },
// }
// }
// Ok(1)
// }
// }
//
// let mut field_cursor = closed_index_writer.field_cursor();
// loop {
// match field_cursor.next() {
// Some(field) => {
// println!(" {:?}", field);
// show_term_cursor(field_cursor.term_cursor());
// },
// None => { break; },
// }
// }

View File

@@ -4,11 +4,16 @@ use self::memmap::{Mmap, Protection};
use std::path::PathBuf;
use std::collections::HashMap;
use std::fs::File;
use std::io::Write;
use std::io::BufWriter;
use std::io;
use std::borrow::Borrow;
use std::borrow::BorrowMut;
use std::rc::Rc;
use std::ops::Deref;
use std::cell::RefCell;
use std::sync::Arc;
use core::error::*;
use rand::{thread_rng, Rng};
@@ -23,183 +28,348 @@ pub fn generate_segment_name() -> SegmentId {
SegmentId( String::from("_") + &random_name)
}
pub trait Dir {
fn get_data(&self, segment_id: &SegmentId, component: SegmentComponent) -> Result<SharedMmapMemory, io::Error>; // {
}
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct Directory {
dir: Rc<Dir>,
index_path: PathBuf,
// mmap_cache: RefCell<HashMap<PathBuf, SharedMmapMemory >>,
}
impl Directory {
fn segment(&self, segment_id: &SegmentId) -> Segment {
pub fn from(filepath: &str) -> Directory {
Directory {
index_path: PathBuf::from(filepath)
}
}
fn resolve_path(&self, relative_path: &PathBuf) -> PathBuf {
self.index_path.join(relative_path)
}
fn segment<'a>(&'a self, segment_id: &SegmentId) -> Segment<'a> {
Segment {
directory: self.dir.clone(),
directory: self,
segment_id: segment_id.clone()
}
}
pub fn new_segment(&self,) -> Segment {
pub fn new_segment<'a>(&'a self,) -> Segment<'a> {
// TODO check it does not exists
self.segment(&generate_segment_name())
}
fn from<T: Dir + 'static>(directory: T) -> Directory {
Directory {
dir: Rc::new(directory),
fn open_writable<'a>(&self, relative_path: &PathBuf) -> Result<File> {
let full_path = self.resolve_path(relative_path);
match File::create(full_path.clone()) {
Ok(f) => Ok(f),
Err(err) => {
let path_str = full_path.to_str().unwrap_or("<error on to_str>");
return Err(Error::IOError(err.kind(), String::from("Could not create file") + path_str))
}
}
}
pub fn open(path_str: &str) -> Directory {
let path = PathBuf::from(path_str);
Directory::from(FileDirectory::for_path(path))
}
pub fn in_mem() -> Directory {
Directory::from(MemDirectory::new())
}
}
impl Dir for Directory {
fn get_data(&self, segment_id: &SegmentId, component: SegmentComponent) -> Result<SharedMmapMemory, io::Error> {
self.dir.get_data(segment_id, component)
}
}
/////////////////////////
// Segment
pub enum SegmentComponent {
POSTINGS,
POSITIONS,
TERMS,
}
pub struct Segment {
directory: Rc<Dir>,
#[derive(Debug)]
pub struct Segment<'a> {
directory: &'a Directory,
segment_id: SegmentId,
}
impl Segment {
impl<'a> Segment<'a> {
fn path_suffix(component: SegmentComponent)-> &'static str {
match component {
SegmentComponent::POSTINGS => ".pstgs",
SegmentComponent::POSTINGS => ".idx",
SegmentComponent::POSITIONS => ".pos",
SegmentComponent::TERMS => ".term",
}
}
fn get_data(&self, component: SegmentComponent) -> Result<SharedMmapMemory, io::Error> {
self.directory.get_data(&self.segment_id, component)
fn get_relative_path(&self, component: SegmentComponent) -> PathBuf {
let SegmentId(ref segment_id_str) = self.segment_id;
let filename = String::new() + segment_id_str + Segment::path_suffix(component);
PathBuf::from(filename)
}
// pub fn get_data(&self, component: SegmentComponent) -> Result<Box<Borrow<[u8]>>> {
// self.directory.get_data(&self.segment_id, component)
// }
pub fn open_writable(&self, component: SegmentComponent) -> Result<File> {
let path = self.get_relative_path(component);
self.directory.open_writable(&path)
}
}
/////////////////////////////////////////////////////////
// MemoryPointer
pub trait MemoryPointer {
fn data(&self) -> &[u8];
}
/////////////////////////////////////////////////////////
// ResidentMemoryPointer
pub struct ResidentMemoryPointer {
data: Box<[u8]>,
len: usize,
}
impl MemoryPointer for ResidentMemoryPointer {
fn data(&self) -> &[u8] {
self.data.deref()
}
}
/////////////////////////////////////////////////////////
// MmapMemory
// #[derive(Clone)]
// pub struct Directory {
// dir: Rc<Dir>,
// }
//
// impl Directory {
// fn segment(&self, segment_id: &SegmentId) -> Segment {
// Segment {
// directory: self.dir.clone(),
// segment_id: segment_id.clone()
// }
// }
//
// pub fn new_segment(&self,) -> Segment {
// self.segment(&generate_segment_name())
// }
//
// fn from<T: Dir + 'static>(directory: T) -> Directory {
// Directory {
// dir: Rc::new(directory),
// }
// }
//
// // pub fn open(path_str: &str) -> Directory {
// // let path = PathBuf::from(path_str);
// // Directory::from(FileDirectory::for_path(path))
// // }
//
// pub fn in_mem() -> Directory {
// Directory::from(MemDirectory::new())
// }
// }
//
// impl Dir for Directory {
// // fn get_data(&self, segment_id: &SegmentId, component: SegmentComponent) -> Result<Box<Borrow<[u8]>>> {
// // self.dir.get_data(segment_id, component)
// // }
//
pub struct MmapMemory(Mmap);
impl MemoryPointer for MmapMemory {
fn data(&self) -> &[u8] {
let &MmapMemory(ref mmap) = self;
unsafe {
mmap.as_slice()
}
}
}
// fn open_writable<'a>(&'a self, path: &PathBuf) -> Result<Box<BorrowMut<Write>>> {
// self.dir.open_writable(path)
// }
// }
#[derive(Clone)]
pub struct SharedMmapMemory(Arc<MmapMemory>);
// pub enum SegmentComponent {
// POSTINGS,
// POSITIONS,
// TERMS,
// }
//
// pub struct Segment {
// directory: Rc<Dir>,
// segment_id: SegmentId,
// }
//
// impl Segment {
// fn path_suffix(component: SegmentComponent)-> &'static str {
// match component {
// SegmentComponent::POSTINGS => ".idx",
// SegmentComponent::POSITIONS => ".pos",
// SegmentComponent::TERMS => ".term",
// }
// }
//
// fn get_path(&self, component: SegmentComponent) -> PathBuf {
// let mut filepath = self.index_path.clone();
// let SegmentId(ref segment_id_str) = self.segment_id;
// let filename = String::new() + segment_id_str + "." + Segment::path_suffix(component);
// filepath.push(filename);
// filepath
// }
//
// // pub fn get_data(&self, component: SegmentComponent) -> Result<Box<Borrow<[u8]>>> {
// // self.directory.get_data(&self.segment_id, component)
// // }
// pub fn open_writable(&self, component: SegmentComponent) -> Result<Box<BorrowMut<Write>>> {
// let path = self.get_path(component);
// self.directory.open_writable(path)
// }
// }
impl SharedMmapMemory {
pub fn new(mmap_memory: MmapMemory) -> SharedMmapMemory {
SharedMmapMemory(Arc::new(mmap_memory))
}
}
//////////////////////////////////////////////////////////
// FileDirectory
pub struct FileDirectory {
index_path: PathBuf,
mmap_cache: RefCell<HashMap<PathBuf, SharedMmapMemory >>,
}
impl FileDirectory {
pub fn for_path(path: PathBuf)-> FileDirectory {
FileDirectory {
index_path: path,
mmap_cache: RefCell::new(HashMap::new()),
}
}
fn get_or_open_mmap(&self, filepath: &PathBuf)->Result<SharedMmapMemory, io::Error> {
if !self.mmap_cache.borrow().contains_key(filepath) {
let file = try!(File::open(filepath));
let mmap = MmapMemory(try!(Mmap::open(&file, Protection::Read)));
self.mmap_cache.borrow_mut().insert(filepath.clone(), SharedMmapMemory::new(mmap));
}
let shared_map: SharedMmapMemory = self.mmap_cache.borrow().get(filepath).unwrap().clone();
Ok(shared_map)
}
}
impl Dir for FileDirectory {
fn get_data(&self, segment_id: &SegmentId, component: SegmentComponent) -> Result<SharedMmapMemory, io::Error> {
let mut filepath = self.index_path.clone();
let SegmentId(ref segment_id_str) = *segment_id;
let filename = String::new() + segment_id_str + "." + Segment::path_suffix(component);
filepath.push(filename);
self.get_or_open_mmap(&filepath)
}
}
//////////////////////////////////////////////////////////
// FileDirectory
//
// /////////////////////////////////////////////////////////
// // ResidentMemoryPointer
//
// pub struct ResidentMemoryPointer {
// data: Box<[u8]>,
// }
//
// impl Borrow<[u8]> for ResidentMemoryPointer {
// fn borrow(&self) -> &[u8] {
// self.data.deref()
// }
// }
//
//
// //////////////////////////////////////////////////////////
// // MemDirectory
// //
// pub struct MemDirectory {
// dir: HashMap<PathBuf, Rc<String>>,
// }
//
// impl MemDirectory {
// pub fn new()-> MemDirectory {
// MemDirectory {
// dir: HashMap::new(),
// }
// }
//
// // fn get_path(&self, segment_id: &SegmentId, component: SegmentComponent) -> PathBuf {
// // let mut filepath = self.index_path.clone();
// // let SegmentId(ref segment_id_str) = *segment_id;
// // let filename = String::new() + segment_id_str + "." + Segment::path_suffix(component);
// // filepath.push(filename);
// // filepath
// // }
// }
//
// impl Directory for MemDirectory {
//
// type Write = Rc<String>;
//
// // fn get_data(&self, segment_id: &SegmentId, component: SegmentComponent) -> Result<Box<Borrow<[u8]>>> {
// // let path = self.get_path(segment_id, component);
// // match self.dir.get(&path) {
// // Some(buf) => Ok(buf.clone()),
// // None => Err(Error::FileNotFound(String::from("File not found"))), // TODO add filename
// // }
// // }
//
// fn open_writable<'a>(&'a self, path: &PathBuf) -> Result<Rc<String>> {
// if self.dir.contains_key(path) {
// return Err(Error::ReadOnly(String::from("Cannot open an already written buffer.")));
// }
// self.dir.insert(path.clone(), Rc::new(String::new()));
// self.dir.get(path);
// Ok(Box::new())
//
// }
// }
//
//
//
//
//
//
//
//
//
//
//
//
//
pub struct MemDirectory {
dir: HashMap<PathBuf, SharedMmapMemory>,
}
impl MemDirectory {
pub fn new()-> MemDirectory {
MemDirectory {
dir: HashMap::new(),
}
}
}
impl Dir for MemDirectory {
fn get_data(&self, segment_id: &SegmentId, component: SegmentComponent) -> Result<SharedMmapMemory, io::Error> {
let SegmentId(ref segment_id_str) = *segment_id;
let mut path = PathBuf::from(segment_id_str);
path.push(Segment::path_suffix(component));
match self.dir.get(&path) {
Some(buf) => Ok(buf.clone()),
None => Err(io::Error::new(io::ErrorKind::NotFound, "File does not exists")),
}
}
}
//
//
// #[derive(Clone)]
// pub struct SharedMmapMemory(Arc<MmapMemory>);
//
// impl SharedMmapMemory {
// pub fn new(mmap_memory: MmapMemory) -> SharedMmapMemory {
// SharedMmapMemory(Arc::new(mmap_memory))
// }
// }
//
//
// /////////////////////////////////////////////////////////
// // MmapMemory
// //
//
// pub struct MmapMemory(Mmap);
//
// impl MemoryPointer for MmapMemory {
// fn data(&self) -> &[u8] {
// let &MmapMemory(ref mmap) = self;
// unsafe {
// mmap.as_slice()
// }
// }
// }
//
// //////////////////////////////////////////////////////////
// // FileDirectory
//
// pub struct FileDirectory {
// index_path: PathBuf,
// mmap_cache: RefCell<HashMap<PathBuf, SharedMmapMemory >>,
// }
//
// impl FileDirectory {
// pub fn for_path(path: PathBuf)-> FileDirectory {
// FileDirectory {
// index_path: path,
// mmap_cache: RefCell::new(HashMap::new()),
// }
// }
//
// fn get_or_open_mmap(&self, filepath: &PathBuf)->Result<[u8]> {
// if !self.mmap_cache.borrow().contains_key(filepath) {
// let file = try!(File::open(filepath));
// let mmap = MmapMemory(try!(Mmap::open(&file, Protection::Read)));
// self.mmap_cache.borrow_mut().insert(filepath.clone(), SharedMmapMemory::new(mmap));
// }
// let shared_map: SharedMmapMemory = self.mmap_cache.borrow().get(filepath).unwrap().clone();
// Ok(shared_map)
// }
//
// fn get_path(&self, segment_id: &SegmentId, component: SegmentComponent) -> PathBuf {
// let mut filepath = self.index_path.clone();
// let SegmentId(ref segment_id_str) = *segment_id;
// let filename = String::new() + segment_id_str + "." + Segment::path_suffix(component);
// filepath.push(filename);
// filepath
// }
// }
//
// impl Dir for FileDirectory {
// fn get_data(&self, segment_id: &SegmentId, component: SegmentComponent) -> Result<[u8]> {
// let filepath = self.get_path(segment_id, component);
// self.get_or_open_mmap(&filepath)
// }
//
// fn open_writable<'a>(&'a self, segment_id: &SegmentId, component: SegmentComponent) -> Result<Write + 'a> {
// Err(Error::IOError("e"))
// }
// }

View File

@@ -1,8 +1,12 @@
use std::result;
use std::io;
#[derive(Debug)]
pub enum Error {
IOError(String),
WriteError(String),
IOError(io::ErrorKind, String),
FileNotFound(String),
ReadOnly(String),
}
pub type Result<T> = result::Result<T, Error>;

View File

@@ -1,6 +1,4 @@
use core::directory::Directory;
use core::global::DocId;
use core::schema::*;
pub struct SegmentIndexReader {
directory: Directory,

View File

@@ -3,6 +3,7 @@ use std::io;
use std::slice;
use core::global::*;
use core::schema::*;
use core::codec::*;
use core::directory::Directory;
use core::analyzer::tokenize;
use std::collections::{HashMap, BTreeMap};
@@ -13,6 +14,7 @@ use std::mem;
use byteorder::{NativeEndian, ReadBytesExt, WriteBytesExt};
use std::iter::Peekable;
use core::serial::*;
use core::error::*;
pub struct SimplePostingsWriter {
doc_ids: Vec<DocId>,
@@ -100,24 +102,13 @@ impl IndexWriter {
self.max_doc += 1;
}
pub fn close(self) -> ClosedIndexWriter {
ClosedIndexWriter {
index_writer: self
}
}
pub fn sync(&mut self,) -> Result<(), io::Error> {
self.directory.new_segment();
Ok(())
pub fn commit(self,) -> Result<usize> {
let segment = self.directory.new_segment();
SimpleCodec::write(&self, &segment)
}
}
pub struct ClosedIndexWriter {
index_writer: IndexWriter,
}
//////////////////////////////////
@@ -244,14 +235,14 @@ impl<'a> TermCursor<'a> for CIWTermCursor<'a> {
//
// TODO use a Term type
//
impl<'a> SerializableSegment<'a> for ClosedIndexWriter {
impl<'a> SerializableSegment<'a> for IndexWriter {
type TermCur = CIWTermCursor<'a>;
fn term_cursor(&'a self) -> CIWTermCursor<'a> {
let mut field_it: hash_map::Iter<'a, Field, FieldWriter> = self.index_writer.term_writers.iter();
let mut field_it: hash_map::Iter<'a, Field, FieldWriter> = self.term_writers.iter();
let (field, field_writer) = field_it.next().unwrap(); // TODO handle no field
let mut term_cursor = CIWTermCursor {
let term_cursor = CIWTermCursor {
field_it: field_it,
form_it: CIWFormCursor {
term_it: field_writer.term_index.iter(),

View File

@@ -1,3 +1,5 @@
#[allow(unused_imports)]
#[macro_use]
extern crate lazy_static;
extern crate fst;

View File

@@ -10,7 +10,7 @@ use tantivy::core::serial::*;
use tantivy::core::schema::*;
use tantivy::core::codec::SimpleCodec;
use tantivy::core::global::*;
use tantivy::core::writer::{IndexWriter, ClosedIndexWriter};
use tantivy::core::writer::IndexWriter;
use tantivy::core::directory::{Directory, generate_segment_name, SegmentId};
use std::ops::DerefMut;
use tantivy::core::writer::SimplePostingsWriter;
@@ -36,7 +36,7 @@ fn test_tokenizer() {
#[test]
fn test_indexing() {
let directory = Directory::in_mem();
let directory = Directory::from("/home/paul/temp/idx");
{
let mut index_writer = IndexWriter::open(&directory);
{
@@ -54,7 +54,11 @@ fn test_indexing() {
doc.set(Field(1), "a b c d");
index_writer.add(doc);
}
let mut closed_index_writer: ClosedIndexWriter = index_writer.close();
let commit_result = index_writer.commit();
// println!("{:?}", commit_result.err());
assert!(commit_result.is_ok());
// assert!(commit_result.is_ok());
// SimpleCodec::write(closed_index_writer, output);
// let mut term_cursor = closed_index_writer.term_cursor();
// loop {
@@ -70,7 +74,7 @@ fn test_indexing() {
// }
// }
// }
assert!(false);
// assert!(false);
}
{
// TODO add index opening stuff