master merged in feature branch

This commit is contained in:
Paul Masurel
2017-03-27 09:27:23 +09:00
12 changed files with 386 additions and 207 deletions

View File

@@ -24,8 +24,6 @@ rustc-serialize = "0.3"
log = "0.3.6"
combine = "2.2"
tempdir = "0.3"
bincode = "0.5"
libc = {version = "0.2.20", optional=true}
num_cpus = "1.2"
@@ -37,12 +35,15 @@ uuid = { version = "0.4", features = ["v4", "rustc-serialize"] }
chan = "0.1"
version = "2"
crossbeam = "0.2"
futures = "0.1.9"
futures-cpupool = "0.1.2"
[target.'cfg(windows)'.dependencies]
winapi = "*"
[dev-dependencies]
rand = "0.3"
env_logger = "0.4"
[build-dependencies]
gcc = {version = "0.3", optional=true}

24
appveyor.yml Normal file
View File

@@ -0,0 +1,24 @@
# Appveyor configuration template for Rust using rustup for Rust installation
# https://github.com/starkat99/appveyor-rust
os: Visual Studio 2015
environment:
matrix:
- channel: nightly
target: x86_64-pc-windows-msvc
- channel: nightly
target: x86_64-pc-windows-gnu
msys_bits: 64
install:
- appveyor DownloadFile https://win.rustup.rs/ -FileName rustup-init.exe
- rustup-init -yv --default-toolchain %channel% --default-host %target%
- set PATH=%PATH%;%USERPROFILE%\.cargo\bin
- if defined msys_bits set PATH=%PATH%;C:\msys64\mingw%msys_bits%\bin
- rustc -vV
- cargo -vV
build: false
test_script:
- REM SET RUST_LOG=tantivy,test & cargo test --verbose

View File

@@ -1,37 +1,50 @@
#[cfg(feature= "simdcompression")]
#[cfg(feature = "simdcompression")]
mod build {
extern crate gcc;
use std::process::Command;
pub fn build() {
Command::new("make")
.current_dir("cpp/simdcomp")
.output()
.unwrap_or_else(|e| { panic!("Failed to make simdcomp: {}", e) });
gcc::Config::new()
.flag("-O3")
.flag("-mssse3")
.include("./cpp/simdcomp/include")
.object("cpp/simdcomp/avxbitpacking.o")
.object("cpp/simdcomp/simdintegratedbitpacking.o")
.object("cpp/simdcomp/simdbitpacking.o")
.object("cpp/simdcomp/simdpackedsearch.o")
.object("cpp/simdcomp/simdcomputil.o")
.object("cpp/simdcomp/simdpackedselect.o")
.object("cpp/simdcomp/simdfor.o")
.file("cpp/simdcomp_wrapper.c")
.compile("libsimdcomp.a");
let mut config = gcc::Config::new();
config.include("./cpp/simdcomp/include")
.file("cpp/simdcomp/src/avxbitpacking.c")
.file("cpp/simdcomp/src/simdintegratedbitpacking.c")
.file("cpp/simdcomp/src/simdbitpacking.c")
.file("cpp/simdcomp/src/simdpackedsearch.c")
.file("cpp/simdcomp/src/simdcomputil.c")
.file("cpp/simdcomp/src/simdpackedselect.c")
.file("cpp/simdcomp/src/simdfor.c")
.file("cpp/simdcomp_wrapper.c");
if !cfg!(debug_assertions) {
config.opt_level(3);
if cfg!(target_env = "msvc") {
config.define("NDEBUG", None)
.flag("/Gm-")
.flag("/GS-")
.flag("/Gy")
.flag("/Oi")
.flag("/GL");
} else {
config.flag("-msse4.1")
.flag("-march=native");
}
}
config.compile("libsimdcomp.a");
// Workaround for linking static libraries built with /GL
// https://github.com/rust-lang/rust/issues/26003
if !cfg!(debug_assertions) && cfg!(target_env = "msvc") {
println!("cargo:rustc-link-lib=dylib=simdcomp");
}
}
}
#[cfg(not(feature= "simdcompression"))]
#[cfg(not(feature = "simdcompression"))]
mod build {
pub fn build() {
}
pub fn build() {}
}
fn main() {
build::build();
}

View File

@@ -52,7 +52,7 @@
<div class="pilwrap ">
<a class="pilcrow" href="#section-2">&#182;</a>
</div>
<p>Lets create a temporary directory for the
<p>Lets create a temporary directory for the
sake of this example</p>
</div>
@@ -60,7 +60,7 @@ sake of this example</p>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">if</span> <span class="hljs-keyword">let</span> <span class="hljs-literal">Ok</span>(dir) = TempDir::new(<span class="hljs-string">"tantivy_example_dir"</span>) {
run_example(dir.path()).unwrap();
dir.close().unwrap();
}
}
}
@@ -78,7 +78,7 @@ sake of this example</p>
<h1 id="defining-the-schema">Defining the schema</h1>
<p>The Tantivy index requires a very strict schema.
The schema declares which fields are in the index,
and for each field, its type and “the way it should
and for each field, its type and “the way it should
be indexed”.</p>
</div>
@@ -111,12 +111,12 @@ be indexed”.</p>
We want full-text search for it, and we want to be able
to retrieve the document after the search.</p>
<p>TEXT | STORED is some syntactic sugar to describe
that. </p>
that.</p>
<p><code>TEXT</code> means the field should be tokenized and indexed,
along with its term frequency and term positions.</p>
<p><code>STORED</code> means that the field will also be saved
in a compressed, row-oriented key-value store.
This store is useful to reconstruct the
This store is useful to reconstruct the
documents that were selected during the search phase.</p>
</div>
@@ -139,7 +139,7 @@ to retrieve the body after the search.</p>
</div>
<div class="content"><div class='highlight'><pre> schema_builder.add_text_field(<span class="hljs-string">"body"</span>, TEXT);
<span class="hljs-keyword">let</span> schema = schema_builder.build();</pre></div></div>
</li>
@@ -173,14 +173,12 @@ with our schema in the directory.</p>
There must be only one writer at a time.
This single <code>IndexWriter</code> is already
multithreaded.</p>
<p>Here we use a buffer of 1 GB. Using a bigger
heap for the indexer can increase its throughput.
This buffer will be split between the indexing
threads.</p>
<p>Here we use a buffer of 50MB per thread. Using a bigger
heap for the indexer can increase its throughput.</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">let</span> <span class="hljs-keyword">mut</span> index_writer = <span class="hljs-built_in">try!</span>(index.writer(<span class="hljs-number">1_000_000_000</span>));</pre></div></div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">let</span> <span class="hljs-keyword">mut</span> index_writer = <span class="hljs-built_in">try!</span>(index.writer(<span class="hljs-number">50_000_000</span>));</pre></div></div>
</li>
@@ -213,10 +211,12 @@ one by one in a Document object.</p>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">let</span> title = schema.get_field(<span class="hljs-string">"title"</span>).unwrap();
<span class="hljs-keyword">let</span> body = schema.get_field(<span class="hljs-string">"body"</span>).unwrap();
<span class="hljs-keyword">let</span> <span class="hljs-keyword">mut</span> old_man_doc = Document::<span class="hljs-keyword">default</span>();
old_man_doc.add_text(title, <span class="hljs-string">"The Old Man and the Sea"</span>);
old_man_doc.add_text(body, <span class="hljs-string">"He was an old man who fished alone in a skiff in the Gulf Stream and he had gone eighty-four days now without taking a fish."</span>);</pre></div></div>
old_man_doc.add_text(body,
<span class="hljs-string">"He was an old man who fished alone in a skiff in the Gulf Stream and \
he had gone eighty-four days now without taking a fish."</span>);</pre></div></div>
</li>
@@ -231,7 +231,7 @@ one by one in a Document object.</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-built_in">try!</span>(index_writer.add_document(old_man_doc));</pre></div></div>
<div class="content"><div class='highlight'><pre> index_writer.add_document(old_man_doc);</pre></div></div>
</li>
@@ -248,13 +248,13 @@ a document object directly from json.</p>
</div>
<div class="content"><div class='highlight'><pre>
<div class="content"><div class='highlight'><pre>
<span class="hljs-keyword">let</span> mice_and_men_doc = <span class="hljs-built_in">try!</span>(schema.parse_document(r#<span class="hljs-string">"{
"</span>title<span class="hljs-string">": "</span>Of Mice and Men<span class="hljs-string">",
"</span>body<span class="hljs-string">": "</span>few miles south of Soledad, the Salinas River drops <span class="hljs-keyword">in</span> close to the hillside bank and runs deep and green. The water is warm too, <span class="hljs-keyword">for</span> it has slipped twinkling over the yellow sands <span class="hljs-keyword">in</span> the sunlight before reaching the narrow pool. On one side of the river the golden foothill slopes curve up to the strong and rocky Gabilan Mountains, but on the valley side the water is lined with trees—willows fresh and green with every spring, carrying <span class="hljs-keyword">in</span> their lower leaf junctures the debris of the winters flooding; and sycamores with mottled, white,recumbent limbs and branches that arch over the pool<span class="hljs-string">"
}"</span>#));
<span class="hljs-built_in">try!</span>(index_writer.add_document(mice_and_men_doc));</pre></div></div>
index_writer.add_document(mice_and_men_doc);</pre></div></div>
</li>
@@ -275,7 +275,7 @@ The following document has two titles.</p>
"</span>title<span class="hljs-string">": ["</span>Frankenstein<span class="hljs-string">", "</span>The Modern Promotheus<span class="hljs-string">"],
"</span>body<span class="hljs-string">": "</span>You will rejoice to hear that no disaster has accompanied the commencement of an enterprise which you have regarded with such evil forebodings. I arrived here yesterday, and my first task is to assure my dear sister of my welfare and increasing confidence <span class="hljs-keyword">in</span> the success of my undertaking.<span class="hljs-string">"
}"</span>#));
<span class="hljs-built_in">try!</span>(index_writer.add_document(frankenstein_doc));</pre></div></div>
index_writer.add_document(frankenstein_doc);</pre></div></div>
</li>
@@ -288,7 +288,7 @@ The following document has two titles.</p>
</div>
<p>This is an example, so we will only index 3 documents
here. You can check out tantivys tutorial to index
the English wikipedia. Tantivys indexing is rather fast.
the English wikipedia. Tantivys indexing is rather fast.
Indexing 5 million articles of the English wikipedia takes
around 4 minutes on my computer!</p>
@@ -343,15 +343,13 @@ commit.</p>
<a class="pilcrow" href="#section-17">&#182;</a>
</div>
<h1 id="searching">Searching</h1>
<p>Lets search our index. We start
by creating a searcher. There can be more
than one searcher at a time.</p>
<p>You should create a searcher
every time you start a “search query”.</p>
<p>Lets search our index. Start by reloading
searchers in the index. This should be done
after every commit().</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">let</span> searcher = index.searcher();</pre></div></div>
<div class="content"><div class='highlight'><pre> <span class="hljs-built_in">try!</span>(index.load_searchers());</pre></div></div>
</li>
@@ -362,14 +360,13 @@ every time you start a “search query”.</p>
<div class="pilwrap ">
<a class="pilcrow" href="#section-18">&#182;</a>
</div>
<p>The query parser can interpret human queries.
Here, if the user does not specify which
field they want to search, tantivy will search
in both title and body.</p>
<p>Afterwards create one (or more) searchers.</p>
<p>You should create a searcher
every time you start a “search query”.</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">let</span> query_parser = QueryParser::new(index.schema(), <span class="hljs-built_in">vec!</span>(title, body));</pre></div></div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">let</span> searcher = index.searcher();</pre></div></div>
</li>
@@ -380,6 +377,24 @@ in both title and body.</p>
<div class="pilwrap ">
<a class="pilcrow" href="#section-19">&#182;</a>
</div>
<p>The query parser can interpret human queries.
Here, if the user does not specify which
field they want to search, tantivy will search
in both title and body.</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">let</span> query_parser = QueryParser::new(index.schema(), <span class="hljs-built_in">vec!</span>[title, body]);</pre></div></div>
</li>
<li id="section-20">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-20">&#182;</a>
</div>
<p>QueryParser may fail if the query is not in the right
format. For user facing applications, this can be a problem.
A ticket has been opened regarding this problem.</p>
@@ -391,11 +406,11 @@ A ticket has been opened regarding this problem.</p>
</li>
<li id="section-20">
<li id="section-21">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-20">&#182;</a>
<a class="pilcrow" href="#section-21">&#182;</a>
</div>
<p>A query defines a set of documents, as
well as the way they should be scored.</p>
@@ -408,36 +423,20 @@ any document matching at least one of our terms.</p>
</li>
<li id="section-21">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-21">&#182;</a>
</div>
<h3 id="collectors">Collectors</h3>
<p>We are not interested in all of the documents but
only in the top 10. Keeping track of our top 10 best documents
is the role of the TopCollector.</p>
</div>
<div class="content"><div class='highlight'><pre>
<span class="hljs-keyword">let</span> <span class="hljs-keyword">mut</span> top_collector = TopCollector::with_limit(<span class="hljs-number">10</span>);</pre></div></div>
</li>
<li id="section-22">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-22">&#182;</a>
</div>
<p>We can now perform our query.</p>
<h3 id="collectors">Collectors</h3>
<p>We are not interested in all of the documents but
only in the top 10. Keeping track of our top 10 best documents
is the role of the TopCollector.</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-built_in">try!</span>(searcher.search(&amp;query, &amp;<span class="hljs-keyword">mut</span> top_collector)));</pre></div></div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">let</span> <span class="hljs-keyword">mut</span> top_collector = TopCollector::with_limit(<span class="hljs-number">10</span>);</pre></div></div>
</li>
@@ -448,12 +447,11 @@ is the role of the TopCollector.</p>
<div class="pilwrap ">
<a class="pilcrow" href="#section-23">&#182;</a>
</div>
<p>Our top collector now contains the 10
most relevant doc ids…</p>
<p>We can now perform our query.</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">let</span> doc_addresses = top_collector.docs();</pre></div></div>
<div class="content"><div class='highlight'><pre> <span class="hljs-built_in">try!</span>(searcher.search(&amp;*query, &amp;<span class="hljs-keyword">mut</span> top_collector));</pre></div></div>
</li>
@@ -464,7 +462,23 @@ most relevant doc ids…</p>
<div class="pilwrap ">
<a class="pilcrow" href="#section-24">&#182;</a>
</div>
<p>The actual documents still need to be
<p>Our top collector now contains the 10
most relevant doc ids…</p>
</div>
<div class="content"><div class='highlight'><pre> <span class="hljs-keyword">let</span> doc_addresses = top_collector.docs();</pre></div></div>
</li>
<li id="section-25">
<div class="annotation">
<div class="pilwrap ">
<a class="pilcrow" href="#section-25">&#182;</a>
</div>
<p>The actual documents still need to be
retrieved from Tantivys store.</p>
<p>Since the body field was not configured as stored,
the document returned will only contain
@@ -472,10 +486,10 @@ a title.</p>
</div>
<div class="content"><div class='highlight'><pre>
<div class="content"><div class='highlight'><pre>
<span class="hljs-keyword">for</span> doc_address <span class="hljs-keyword">in</span> doc_addresses {
<span class="hljs-keyword">let</span> retrieved_doc = <span class="hljs-built_in">try!</span>(searcher.doc(&amp;doc_address));
<span class="hljs-built_in">println!</span>(<span class="hljs-string">"{}"</span>, schema.to_json(&amp;retrieved_doc));
<span class="hljs-keyword">let</span> retrieved_doc = <span class="hljs-built_in">try!</span>(searcher.doc(&amp;doc_address));
<span class="hljs-built_in">println!</span>(<span class="hljs-string">"{}"</span>, schema.to_json(&amp;retrieved_doc));
}
<span class="hljs-literal">Ok</span>(())

View File

@@ -10,105 +10,105 @@ use tantivy::collector::TopCollector;
use tantivy::query::QueryParser;
fn main() {
// Let's create a temporary directory for the
// Let's create a temporary directory for the
// sake of this example
if let Ok(dir) = TempDir::new("tantivy_example_dir") {
run_example(dir.path()).unwrap();
dir.close().unwrap();
}
}
}
fn run_example(index_path: &Path) -> tantivy::Result<()> {
// # Defining the schema
//
// The Tantivy index requires a very strict schema.
// The schema declares which fields are in the index,
// and for each field, its type and "the way it should
// and for each field, its type and "the way it should
// be indexed".
// first we need to define a schema ...
let mut schema_builder = SchemaBuilder::default();
// Our first field is title.
// We want full-text search for it, and we want to be able
// to retrieve the document after the search.
//
// TEXT | STORED is some syntactic sugar to describe
// that.
//
// that.
//
// `TEXT` means the field should be tokenized and indexed,
// along with its term frequency and term positions.
//
// `STORED` means that the field will also be saved
// in a compressed, row-oriented key-value store.
// This store is useful to reconstruct the
// This store is useful to reconstruct the
// documents that were selected during the search phase.
schema_builder.add_text_field("title", TEXT | STORED);
// Our first field is body.
// We want full-text search for it, and we want to be able
// to retrieve the body after the search.
schema_builder.add_text_field("body", TEXT);
let schema = schema_builder.build();
let schema = schema_builder.build();
// # Indexing documents
//
// Let's create a brand new index.
//
//
// This will actually just save a meta.json
// with our schema in the directory.
let index = try!(Index::create(index_path, schema.clone()));
// To insert document we need an index writer.
// There must be only one writer at a time.
// This single `IndexWriter` is already
// multithreaded.
//
// Here we use a buffer of 1 GB. Using a bigger
// Here we use a buffer of 50MB per thread. Using a bigger
// heap for the indexer can increase its throughput.
// This buffer will be split between the indexing
// threads.
let mut index_writer = try!(index.writer(1_000_000_000));
let mut index_writer = try!(index.writer(50_000_000));
// Let's index our documents!
// We first need a handle on the title and the body field.
// ### Create a document "manually".
//
// We can create a document manually, by setting the fields
// one by one in a Document object.
let title = schema.get_field("title").unwrap();
let body = schema.get_field("body").unwrap();
let mut old_man_doc = Document::default();
old_man_doc.add_text(title, "The Old Man and the Sea");
old_man_doc.add_text(body, "He was an old man who fished alone in a skiff in the Gulf Stream and he had gone eighty-four days now without taking a fish.");
old_man_doc.add_text(body,
"He was an old man who fished alone in a skiff in the Gulf Stream and \
he had gone eighty-four days now without taking a fish.");
// ... and add it to the `IndexWriter`.
index_writer.add_document(old_man_doc);
// ### Create a document directly from json.
//
// Alternatively, we can use our schema to parse
// a document object directly from json.
let mice_and_men_doc = try!(schema.parse_document(r#"{
"title": "Of Mice and Men",
"body": "few miles south of Soledad, the Salinas River drops in close to the hillside bank and runs deep and green. The water is warm too, for it has slipped twinkling over the yellow sands in the sunlight before reaching the narrow pool. On one side of the river the golden foothill slopes curve up to the strong and rocky Gabilan Mountains, but on the valley side the water is lined with trees—willows fresh and green with every spring, carrying in their lower leaf junctures the debris of the winters flooding; and sycamores with mottled, white,recumbent limbs and branches that arch over the pool"
}"#));
index_writer.add_document(mice_and_men_doc);
// Multi-valued field are allowed, they are
// expressed in JSON by an array.
// The following document has two titles.
@@ -117,19 +117,19 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
"body": "You will rejoice to hear that no disaster has accompanied the commencement of an enterprise which you have regarded with such evil forebodings. I arrived here yesterday, and my first task is to assure my dear sister of my welfare and increasing confidence in the success of my undertaking."
}"#));
index_writer.add_document(frankenstein_doc);
// This is an example, so we will only index 3 documents
// here. You can check out tantivy's tutorial to index
// the English wikipedia. Tantivy's indexing is rather fast.
// the English wikipedia. Tantivy's indexing is rather fast.
// Indexing 5 million articles of the English wikipedia takes
// around 4 minutes on my computer!
// ### Committing
//
//
// At this point our documents are not searchable.
//
//
//
// We need to call .commit() explicitly to force the
// index_writer to finish processing the documents in the queue,
// flush the current index to the disk, and advertise
@@ -137,22 +137,25 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
//
// This call is blocking.
try!(index_writer.commit());
// If `.commit()` returns correctly, then all of the
// documents that have been added are guaranteed to be
// persistently indexed.
//
//
// In the scenario of a crash or a power failure,
// tantivy behaves as if has rolled back to its last
// commit.
// # Searching
//
// Let's search our index. We start
// by creating a searcher. There can be more
// than one searcher at a time.
//
// Let's search our index. Start by reloading
// searchers in the index. This should be done
// after every commit().
try!(index.load_searchers());
// Afterwards create one (or more) searchers.
//
// You should create a searcher
// every time you start a "search query".
let searcher = index.searcher();
@@ -161,46 +164,45 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
// Here, if the user does not specify which
// field they want to search, tantivy will search
// in both title and body.
let query_parser = QueryParser::new(index.schema(), vec!(title, body));
let query_parser = QueryParser::new(index.schema(), vec![title, body]);
// QueryParser may fail if the query is not in the right
// format. For user facing applications, this can be a problem.
// A ticket has been opened regarding this problem.
let query = try!(query_parser.parse_query("sea whale"));
// A query defines a set of documents, as
// well as the way they should be scored.
//
//
// A query created by the query parser is scored according
// to a metric called Tf-Idf, and will consider
// any document matching at least one of our terms.
// ### Collectors
// ### Collectors
//
// We are not interested in all of the documents but
// We are not interested in all of the documents but
// only in the top 10. Keeping track of our top 10 best documents
// is the role of the TopCollector.
let mut top_collector = TopCollector::with_limit(10);
// We can now perform our query.
try!(searcher.search(&*query, &mut top_collector));
// Our top collector now contains the 10
// Our top collector now contains the 10
// most relevant doc ids...
let doc_addresses = top_collector.docs();
// The actual documents still need to be
// The actual documents still need to be
// retrieved from Tantivy's store.
//
//
// Since the body field was not configured as stored,
// the document returned will only contain
// a title.
for doc_address in doc_addresses {
let retrieved_doc = try!(searcher.doc(&doc_address));
println!("{}", schema.to_json(&retrieved_doc));
let retrieved_doc = try!(searcher.doc(&doc_address));
println!("{}", schema.to_json(&retrieved_doc));
}
Ok(())

View File

@@ -5,7 +5,6 @@ const COMPRESSED_BLOCK_MAX_SIZE: usize = NUM_DOCS_PER_BLOCK * 4 + 1;
mod simdcomp {
use libc::size_t;
#[link(name = "simdcomp")]
extern {
pub fn compress_sorted(
data: *const u32,

View File

@@ -86,7 +86,9 @@ impl ManagedDirectory {
managed_has_changed |= managed_paths_write.remove(&file_to_delete);
}
FileError::IOError(_) => {
error!("Failed to delete {:?}", file_to_delete);
if !cfg!(target_os = "windows") {
error!("Failed to delete {:?}", file_to_delete);
}
}
}
@@ -244,4 +246,33 @@ mod tests {
}
}
#[test]
fn test_managed_directory_gc_while_mmapped() {
let tempdir = TempDir::new("index").unwrap();
let tempdir_path = PathBuf::from(tempdir.path());
let living_files = HashSet::new();
let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
let mut managed_directory = ManagedDirectory::new(mmap_directory).unwrap();
managed_directory.atomic_write(*TEST_PATH1, &vec!(0u8,1u8)).unwrap();
assert!(managed_directory.exists(*TEST_PATH1));
let _mmap_read = managed_directory.open_read(*TEST_PATH1).unwrap();
managed_directory.garbage_collect(living_files.clone());
if cfg!(target_os = "windows") {
// On Windows, gc should try and fail the file as it is mmapped.
assert!(managed_directory.exists(*TEST_PATH1));
// unmap should happen here.
drop(_mmap_read);
// The file should still be in the list of managed file and
// eventually be deleted once mmap is released.
managed_directory.garbage_collect(living_files);
assert!(!managed_directory.exists(*TEST_PATH1));
}
else {
assert!(!managed_directory.exists(*TEST_PATH1));
}
}
}

View File

@@ -23,7 +23,6 @@ use std::sync::RwLock;
use std::sync::Weak;
use tempdir::TempDir;
fn open_mmap(full_path: &PathBuf) -> result::Result<Option<Arc<Mmap>>, FileError> {
let convert_file_error = |err: io::Error| {
if err.kind() == io::ErrorKind::NotFound {
@@ -219,12 +218,28 @@ impl MmapDirectory {
/// Sync the root directory.
/// In certain FS, this is required to persistently create
/// a file.
fn sync_directory(&self,) -> Result<(), io::Error> {
let fd = try!(File::open(&self.root_path));
fn sync_directory(&self) -> Result<(), io::Error> {
let mut open_opts = OpenOptions::new();
// Linux needs read to be set, otherwise returns EINVAL
// write must not be set, or it fails with EISDIR
open_opts.read(true);
// On Windows, opening a directory requires FILE_FLAG_BACKUP_SEMANTICS
// and calling sync_all() only works if write access is requested.
#[cfg(windows)]
{
use std::os::windows::fs::OpenOptionsExt;
use winapi::winbase;
open_opts.write(true)
.custom_flags(winbase::FILE_FLAG_BACKUP_SEMANTICS);
}
let fd = try!(open_opts.open(&self.root_path));
try!(fd.sync_all());
Ok(())
}
/// Returns some statistical information
/// about the Mmap cache.
///

View File

@@ -60,31 +60,37 @@ mod tests {
fn test_simple(directory: &mut Directory) {
{
let mut write_file = directory.open_write(*TEST_PATH).unwrap();
assert!(directory.exists(*TEST_PATH));
write_file.write_all(&[4]).unwrap();
write_file.write_all(&[3]).unwrap();
write_file.write_all(&[7,3,5]).unwrap();
write_file.flush().unwrap();
{
let mut write_file = directory.open_write(*TEST_PATH).unwrap();
assert!(directory.exists(*TEST_PATH));
write_file.write_all(&[4]).unwrap();
write_file.write_all(&[3]).unwrap();
write_file.write_all(&[7,3,5]).unwrap();
write_file.flush().unwrap();
}
let read_file = directory.open_read(*TEST_PATH).unwrap();
let data: &[u8] = &*read_file;
assert_eq!(data, &[4u8, 3u8, 7u8, 3u8, 5u8]);
}
let read_file = directory.open_read(*TEST_PATH).unwrap();
let data: &[u8] = &*read_file;
assert_eq!(data, &[4u8, 3u8, 7u8, 3u8, 5u8]);
assert!(directory.delete(*TEST_PATH).is_ok());
assert!(!directory.exists(*TEST_PATH));
}
fn test_seek(directory: &mut Directory) {
{
let mut write_file = directory.open_write(*TEST_PATH).unwrap();
write_file.write_all(&[4, 3, 7,3,5]).unwrap();
write_file.seek(SeekFrom::Start(0)).unwrap();
write_file.write_all(&[3,1]).unwrap();
write_file.flush().unwrap();
{
let mut write_file = directory.open_write(*TEST_PATH).unwrap();
write_file.write_all(&[4, 3, 7,3,5]).unwrap();
write_file.seek(SeekFrom::Start(0)).unwrap();
write_file.write_all(&[3,1]).unwrap();
write_file.flush().unwrap();
}
let read_file = directory.open_read(*TEST_PATH).unwrap();
let data: &[u8] = &*read_file;
assert_eq!(data, &[3u8, 1u8, 7u8, 3u8, 5u8]);
}
let read_file = directory.open_read(*TEST_PATH).unwrap();
let data: &[u8] = &*read_file;
assert_eq!(data, &[3u8, 1u8, 7u8, 3u8, 5u8]);
assert!(directory.delete(*TEST_PATH).is_ok());
}
@@ -118,14 +124,27 @@ mod tests {
let mut write_file = directory.open_write(*TEST_PATH).unwrap();
write_file.write_all(&[1, 2, 3, 4]).unwrap();
write_file.flush().unwrap();
let read_handle = directory.open_read(*TEST_PATH).unwrap();
{
assert_eq!(&*read_handle, &[1u8, 2u8, 3u8, 4u8]);
assert!(directory.delete(*TEST_PATH).is_ok());
assert!(directory.delete(Path::new("SomeOtherPath")).is_err());
assert_eq!(&*read_handle, &[1u8, 2u8, 3u8, 4u8]);
let read_handle = directory.open_read(*TEST_PATH).unwrap();
{
assert_eq!(&*read_handle, &[1u8, 2u8, 3u8, 4u8]);
// Mapped files can't be deleted on Windows
if !cfg!(windows) {
assert!(directory.delete(*TEST_PATH).is_ok());
assert_eq!(&*read_handle, &[1u8, 2u8, 3u8, 4u8]);
}
assert!(directory.delete(Path::new("SomeOtherPath")).is_err());
}
}
if cfg!(windows) {
assert!(directory.delete(*TEST_PATH).is_ok());
}
assert!(directory.open_read(*TEST_PATH).is_err());
assert!(directory.delete(*TEST_PATH).is_err());
}
fn test_directory(directory: &mut Directory) {

View File

@@ -304,11 +304,11 @@ fn index_documents(heap: &mut Heap,
else { None } }
);
segment_updater
Ok(
segment_updater
.add_segment(generation, segment_entry)
.wait()
.map_err(|_| Error::ErrorInThread("Could not add segment.".to_string()))
)
}
@@ -320,7 +320,9 @@ impl IndexWriter {
// dropping the last reference to the segment_updater.
drop(self.document_sender);
let former_workers_handles = mem::replace(&mut self.workers_join_handle, vec!());
debug!("wait {} merging threads START", former_workers_handles.len());
for join_handle in former_workers_handles {
try!(join_handle.join()
.expect("Indexing Worker thread panicked")
@@ -330,11 +332,14 @@ impl IndexWriter {
}
drop(self.workers_join_handle);
self.segment_updater
let result = self.segment_updater
.wait_merging_thread()
.map_err(|_|
Error::ErrorInThread("Failed to join merging thread.".to_string())
)
);
debug!("wait merging threads DONE");
result
}
/// Spawns a new worker thread for indexing.
@@ -459,6 +464,7 @@ impl IndexWriter {
heap_size_in_bytes_per_thread)?;
Ok(index_writer)
}
@@ -511,8 +517,7 @@ impl IndexWriter {
// wait for the segment update thread to have processed the info
self.segment_updater
.commit(self.committed_opstamp)
.wait()?;
.commit(self.committed_opstamp)?;
Ok(self.committed_opstamp)
}
@@ -578,6 +583,7 @@ mod tests {
use Index;
use Term;
use Error;
use env_logger;
#[test]
fn test_lockfile_stops_duplicates() {
@@ -661,6 +667,7 @@ mod tests {
#[test]
fn test_with_merges() {
let _ = env_logger::init();
let mut schema_builder = schema::SchemaBuilder::default();
let text_field = schema_builder.add_text_field("text", schema::TEXT);
let index = Index::create_in_ram(schema_builder.build());
@@ -688,6 +695,7 @@ mod tests {
index_writer.commit().expect("commit failed");
index_writer.wait_merging_threads().expect("waiting merging thread failed");
index.load_searchers().unwrap();
assert_eq!(num_docs_containing("a"), 200);
assert_eq!(index.searchable_segments().unwrap().len(), 1);
}

View File

@@ -11,7 +11,7 @@ use directory::Directory;
use indexer::stamper::Stamper;
use Error;
use futures_cpupool::CpuPool;
use futures::{Future, future};
use futures::Future;
use futures::Canceled;
use futures::oneshot;
use indexer::{MergePolicy, DefaultMergePolicy};
@@ -21,6 +21,7 @@ use indexer::merger::IndexMerger;
use indexer::SegmentEntry;
use indexer::SegmentSerializer;
use Result;
use futures_cpupool::CpuFuture;
use rustc_serialize::json;
use indexer::delete_queue::DeleteCursor;
use schema::Schema;
@@ -77,8 +78,9 @@ pub fn save_metas(segment_metas: Vec<SegmentMeta>,
};
let mut w = vec!();
try!(write!(&mut w, "{}\n", json::as_pretty_json(&metas)));
Ok(directory
.atomic_write(&META_FILEPATH, &w[..])?)
let res = directory.atomic_write(&META_FILEPATH, &w[..])?;
debug!("Saved metas {}", json::as_pretty_json(&metas));
Ok(res)
}
@@ -198,25 +200,25 @@ impl SegmentUpdater {
self.0.merging_thread_id.fetch_add(1, Ordering::SeqCst)
}
fn run_async<T: 'static + Send, F: 'static + Send + FnOnce(SegmentUpdater) -> T>(&self, f: F) -> impl Future<Item=T, Error=Error> {
fn run_async<T: 'static + Send, F: 'static + Send + FnOnce(SegmentUpdater) -> T>(&self, f: F) -> CpuFuture<T, Error> {
let me_clone = self.clone();
self.0.pool.spawn_fn(move || {
Ok(f(me_clone))
})
}
pub fn add_segment(&self, generation: usize, segment_entry: SegmentEntry) -> impl Future<Item=bool, Error=Error> {
if self.is_alive() && generation >= self.0.generation.load(Ordering::Acquire) {
future::Either::A({
self.run_async(|segment_updater| {
segment_updater.0.segment_manager.add_segment(segment_entry);
segment_updater.consider_merge_options();
true
})
})
pub fn add_segment(&self, generation: usize, segment_entry: SegmentEntry) -> bool {
if generation >= self.0.generation.load(Ordering::Acquire) {
self.run_async(|segment_updater| {
segment_updater.0.segment_manager.add_segment(segment_entry);
segment_updater.consider_merge_options();
true
}).forget();
true
}
else {
future::Either::B(future::ok(false))
false
}
}
@@ -250,20 +252,26 @@ impl SegmentUpdater {
}
}
pub fn commit(&self, opstamp: u64) -> impl Future<Item=(), Error=Error> {
pub fn commit(&self, opstamp: u64) -> Result<()> {
self.run_async(move |segment_updater| {
let mut index = segment_updater.0.index.clone();
if segment_updater.is_alive() {
let segment_entries = segment_updater
.purge_deletes(opstamp)
.expect("Failed purge deletes");
segment_updater.0.segment_manager.commit(segment_entries);
let mut index = segment_updater.0.index.clone();
segment_updater.save_metas(opstamp);
let living_files = segment_updater.0.segment_manager.list_files();
index.directory_mut().garbage_collect(living_files);
segment_updater.consider_merge_options();
}
})
let living_files = segment_updater.0.segment_manager.list_files();
index.directory_mut().garbage_collect(living_files);
segment_updater.consider_merge_options();
}).wait()
}
@@ -298,7 +306,6 @@ impl SegmentUpdater {
let merged_segment_meta = after_merge_segment_entry.meta().clone();
segment_updater_clone
.end_merge(segment_ids_vec, after_merge_segment_entry)
.wait()
.expect("Segment updater thread is corrupted.");
// the future may fail if the listener of the oneshot future
@@ -315,6 +322,41 @@ impl SegmentUpdater {
// merging_future_send will be dropped, sending an error to the future.
}
}
// <<<<<<< HEAD
// =======
// let segments: Vec<Segment> = segment_metas
// .iter()
// .cloned()
// .map(|segment_meta| index.segment(segment_meta))
// .collect();
// // An IndexMerger is like a "view" of our merged segments.
// let merger: IndexMerger = IndexMerger::open(schema, &segments[..])?;
// let mut merged_segment = index.new_segment();
// // ... we just serialize this index merger in our new segment
// // to merge the two segments.
// let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).expect("Creating index serializer failed");
// let num_docs = merger.write(segment_serializer).expect("Serializing merged index failed");
// let mut segment_meta = SegmentMeta::new(merged_segment.id());
// segment_meta.set_max_doc(num_docs);
// let segment_entry = SegmentEntry::new(segment_meta);
// segment_updater_clone
// .end_merge(segment_metas.clone(), segment_entry.clone())
// .unwrap();
// // Send will fail if nobody is waiting for the result and
// // the receiver side got destroyed.
// //
// // This is not a problem.
// let _send_result = merging_future_send
// .send(segment_entry.clone());
// >>>>>>> master
segment_updater_clone.0.merging_threads.write().unwrap().remove(&merging_thread_id);
Ok(())
});
@@ -345,9 +387,10 @@ impl SegmentUpdater {
fn end_merge(&self,
before_merge_segment_ids: Vec<SegmentId>,
mut after_merge_segment_entry: SegmentEntry) -> impl Future<Item=(), Error=Error> {
mut after_merge_segment_entry: SegmentEntry) -> Result<()> {
self.run_async(move |segment_updater| {
debug!("End merge {:?}", after_merge_segment_entry.meta());
if let Some(delete_operation) = after_merge_segment_entry.delete_cursor().peek() {
let committed_opstamp = segment_updater.0.index.opstamp();
if delete_operation.opstamp < committed_opstamp {
@@ -358,10 +401,10 @@ impl SegmentUpdater {
}
segment_updater.0.segment_manager.end_merge(&before_merge_segment_ids, after_merge_segment_entry);
segment_updater.save_metas(segment_updater.0.index.opstamp());
})
}).wait()
}
pub fn wait_merging_thread(&self) -> thread::Result<()> {
pub fn wait_merging_thread(&self) -> Result<()> {
let mut new_merging_threads = HashMap::new();
{
let mut merging_threads = self.0.merging_threads.write().unwrap();
@@ -370,9 +413,13 @@ impl SegmentUpdater {
for (_, merging_thread_handle) in new_merging_threads {
merging_thread_handle
.join()
.map(|_| ())?
.map(|_| ())
.map_err(|_| {
Error::ErrorInThread("Merging thread failed.".to_string())
})?
}
Ok(())
// Our merging thread may have queued their completed
self.run_async(move |_| {}).wait()
}
}
@@ -440,4 +487,4 @@ mod tests {
assert_eq!(index.searcher().segment_readers().len(), 1);
assert_eq!(index.searcher().num_docs(), 302);
}
}
}

View File

@@ -50,9 +50,15 @@ extern crate bit_set;
extern crate futures;
extern crate futures_cpupool;
#[cfg(test)]
extern crate env_logger;
#[cfg(feature="simdcompression")]
extern crate libc;
#[cfg(windows)]
extern crate winapi;
#[cfg(test)] extern crate test;
#[cfg(test)] extern crate rand;