feat(fulltext_index): integrate puffin manager with inverted index applier (#4266)

* feat(fulltext_index): integrate puffin manager with inverted index applier

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: get rid of unexpected not found from write cache

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: move create_dir_all to BoundedStager::new

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: update config.md

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* config: unify directories

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: silent remove

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: config docs

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: auxiliary -> aux

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2024-07-04 14:18:58 +08:00
committed by GitHub
parent ee9a5d7611
commit 65c9fbbd2f
33 changed files with 649 additions and 204 deletions

View File

@@ -118,12 +118,15 @@
| `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).<br/>- `0`: using the default value (1/4 of cpu cores).<br/>- `1`: scan in current thread.<br/>- `n`: scan in parallelism n. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
| `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for<br/>creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.<br/>The default name for this directory is `index_intermediate` for backward compatibility.<br/><br/>This path contains two subdirectories:<br/>- `__intm`: for storing intermediate files used during creating index.<br/>- `staging`: for storing staging files used during searching index. |
| `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. |
| `region_engine.mito.inverted_index` | -- | -- | The options for inverted index in Mito engine. |
| `region_engine.mito.inverted_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically<br/>- `disable`: never |
| `region_engine.mito.inverted_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically<br/>- `disable`: never |
| `region_engine.mito.inverted_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically<br/>- `disable`: never |
| `region_engine.mito.inverted_index.mem_threshold_on_create` | String | `64M` | Memory threshold for performing an external sort during index creation.<br/>Setting to empty will disable external sorting, forcing all sorting operations to happen in memory. |
| `region_engine.mito.inverted_index.intermediate_path` | String | `""` | File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`). |
| `region_engine.mito.inverted_index.intermediate_path` | String | `""` | Deprecated, use `region_engine.mito.index.aux_path` instead. |
| `region_engine.mito.memtable` | -- | -- | -- |
| `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.<br/>- `time_series`: time-series memtable<br/>- `partition_tree`: partition tree memtable (experimental) |
| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.<br/>Only available for `partition_tree` memtable. |
@@ -399,12 +402,15 @@
| `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).<br/>- `0`: using the default value (1/4 of cpu cores).<br/>- `1`: scan in current thread.<br/>- `n`: scan in parallelism n. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
| `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for<br/>creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.<br/>The default name for this directory is `index_intermediate` for backward compatibility.<br/><br/>This path contains two subdirectories:<br/>- `__intm`: for storing intermediate files used during creating index.<br/>- `staging`: for storing staging files used during searching index. |
| `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. |
| `region_engine.mito.inverted_index` | -- | -- | The options for inverted index in Mito engine. |
| `region_engine.mito.inverted_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically<br/>- `disable`: never |
| `region_engine.mito.inverted_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically<br/>- `disable`: never |
| `region_engine.mito.inverted_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically<br/>- `disable`: never |
| `region_engine.mito.inverted_index.mem_threshold_on_create` | String | `64M` | Memory threshold for performing an external sort during index creation.<br/>Setting to empty will disable external sorting, forcing all sorting operations to happen in memory. |
| `region_engine.mito.inverted_index.intermediate_path` | String | `""` | File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`). |
| `region_engine.mito.inverted_index.intermediate_path` | String | `""` | Deprecated, use `region_engine.mito.index.aux_path` instead. |
| `region_engine.mito.memtable` | -- | -- | -- |
| `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.<br/>- `time_series`: time-series memtable<br/>- `partition_tree`: partition tree memtable (experimental) |
| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.<br/>Only available for `partition_tree` memtable. |

View File

@@ -394,6 +394,21 @@ parallel_scan_channel_size = 32
## Whether to allow stale WAL entries read during replay.
allow_stale_entries = false
## The options for index in Mito engine.
[region_engine.mito.index]
## Auxiliary directory path for the index in filesystem, used to store intermediate files for
## creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.
## The default name for this directory is `index_intermediate` for backward compatibility.
##
## This path contains two subdirectories:
## - `__intm`: for storing intermediate files used during creating index.
## - `staging`: for storing staging files used during searching index.
aux_path = ""
## The max capacity of the staging directory.
staging_size = "2GB"
## The options for inverted index in Mito engine.
[region_engine.mito.inverted_index]
@@ -416,7 +431,7 @@ apply_on_query = "auto"
## Setting to empty will disable external sorting, forcing all sorting operations to happen in memory.
mem_threshold_on_create = "64M"
## File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`).
## Deprecated, use `region_engine.mito.index.aux_path` instead.
intermediate_path = ""
[region_engine.mito.memtable]

View File

@@ -417,6 +417,21 @@ parallel_scan_channel_size = 32
## Whether to allow stale WAL entries read during replay.
allow_stale_entries = false
## The options for index in Mito engine.
[region_engine.mito.index]
## Auxiliary directory path for the index in filesystem, used to store intermediate files for
## creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.
## The default name for this directory is `index_intermediate` for backward compatibility.
##
## This path contains two subdirectories:
## - `__intm`: for storing intermediate files used during creating index.
## - `staging`: for storing staging files used during searching index.
aux_path = ""
## The max capacity of the staging directory.
staging_size = "2GB"
## The options for inverted index in Mito engine.
[region_engine.mito.inverted_index]
@@ -439,7 +454,7 @@ apply_on_query = "auto"
## Setting to empty will disable external sorting, forcing all sorting operations to happen in memory.
mem_threshold_on_create = "64M"
## File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`).
## Deprecated, use `region_engine.mito.index.aux_path` instead.
intermediate_path = ""
[region_engine.mito.memtable]

View File

@@ -27,6 +27,7 @@ use crate::read::Source;
use crate::region::options::IndexOptions;
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::index::IndexerBuilder;
use crate::sst::location;
use crate::sst::parquet::reader::ParquetReaderBuilder;
@@ -40,6 +41,8 @@ pub struct AccessLayer {
region_dir: String,
/// Target object store.
object_store: ObjectStore,
/// Puffin manager factory for index.
puffin_manager_factory: PuffinManagerFactory,
/// Intermediate manager for inverted index.
intermediate_manager: IntermediateManager,
}
@@ -57,11 +60,13 @@ impl AccessLayer {
pub fn new(
region_dir: impl Into<String>,
object_store: ObjectStore,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
) -> AccessLayer {
AccessLayer {
region_dir: region_dir.into(),
object_store,
puffin_manager_factory,
intermediate_manager,
}
}
@@ -76,6 +81,11 @@ impl AccessLayer {
&self.object_store
}
/// Returns the puffin manager factory.
pub fn puffin_manager_factory(&self) -> &PuffinManagerFactory {
&self.puffin_manager_factory
}
/// Deletes a SST file (and its index file if it has one) with given file id.
pub(crate) async fn delete_sst(&self, file_meta: &FileMeta) -> Result<()> {
let path = location::sst_file_path(&self.region_dir, file_meta.file_id);
@@ -86,15 +96,13 @@ impl AccessLayer {
file_id: file_meta.file_id,
})?;
if file_meta.inverted_index_available() {
let path = location::index_file_path(&self.region_dir, file_meta.file_id);
self.object_store
.delete(&path)
.await
.context(DeleteIndexSnafu {
file_id: file_meta.file_id,
})?;
}
let path = location::index_file_path(&self.region_dir, file_meta.file_id);
self.object_store
.delete(&path)
.await
.context(DeleteIndexSnafu {
file_id: file_meta.file_id,
})?;
Ok(())
}

View File

@@ -117,6 +117,7 @@ impl FileCache {
}
/// Reads a file from the cache.
#[allow(unused)]
pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
// We must use `get()` to update the estimator of the cache.
// See https://docs.rs/moka/latest/moka/future/struct.Cache.html#method.contains_key

View File

@@ -45,6 +45,7 @@ use crate::schedule::scheduler::LocalScheduler;
use crate::sst::file::{FileMeta, IndexType};
use crate::sst::file_purger::LocalFilePurger;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::parquet::WriteOptions;
/// CompactionRegion represents a region that needs to be compacted.
@@ -93,13 +94,19 @@ pub async fn open_compaction_region(
};
let access_layer = {
let puffin_manager_factory = PuffinManagerFactory::new(
&mito_config.index.aux_path,
mito_config.index.staging_size.as_bytes(),
Some(mito_config.index.write_buffer_size.as_bytes() as _),
)
.await?;
let intermediate_manager =
IntermediateManager::init_fs(mito_config.inverted_index.intermediate_path.clone())
.await?;
IntermediateManager::init_fs(mito_config.index.aux_path.clone()).await?;
Arc::new(AccessLayer::new(
req.region_dir.as_str(),
object_store.clone(),
puffin_manager_factory,
intermediate_manager,
))
};
@@ -266,7 +273,7 @@ impl Compactor for DefaultCompactor {
let index_write_buffer_size = Some(
compaction_region
.engine_config
.inverted_index
.index
.write_buffer_size
.as_bytes() as usize,
);

View File

@@ -15,6 +15,7 @@
//! Configurations.
use std::cmp;
use std::path::Path;
use std::time::Duration;
use common_base::readable_size::ReadableSize;
@@ -104,6 +105,8 @@ pub struct MitoConfig {
/// Whether to allow stale entries read during replay.
pub allow_stale_entries: bool,
/// Index configs.
pub index: IndexConfig,
/// Inverted index configs.
pub inverted_index: InvertedIndexConfig,
@@ -134,6 +137,7 @@ impl Default for MitoConfig {
scan_parallelism: divide_num_cpus(4),
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
allow_stale_entries: false,
index: IndexConfig::default(),
inverted_index: InvertedIndexConfig::default(),
memtable: MemtableConfig::default(),
};
@@ -202,7 +206,7 @@ impl MitoConfig {
self.experimental_write_cache_path = join_dir(data_home, "write_cache");
}
self.inverted_index.sanitize(data_home)?;
self.index.sanitize(data_home, &self.inverted_index)?;
Ok(())
}
@@ -246,6 +250,70 @@ impl MitoConfig {
}
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
pub struct IndexConfig {
/// Auxiliary directory path for the index in filesystem, used to
/// store intermediate files for creating the index and staging files
/// for searching the index, defaults to `{data_home}/index_intermediate`.
///
/// This path contains two subdirectories:
/// - `__intm`: for storing intermediate files used during creating index.
/// - `staging`: for storing staging files used during searching index.
///
/// The default name for this directory is `index_intermediate` for backward compatibility.
pub aux_path: String,
/// The max capacity of the staging directory.
pub staging_size: ReadableSize,
/// Write buffer size for creating the index.
pub write_buffer_size: ReadableSize,
}
impl Default for IndexConfig {
fn default() -> Self {
Self {
aux_path: String::new(),
staging_size: ReadableSize::gb(2),
write_buffer_size: ReadableSize::mb(8),
}
}
}
impl IndexConfig {
pub fn sanitize(
&mut self,
data_home: &str,
inverted_index: &InvertedIndexConfig,
) -> Result<()> {
#[allow(deprecated)]
if self.aux_path.is_empty() && !inverted_index.intermediate_path.is_empty() {
self.aux_path.clone_from(&inverted_index.intermediate_path);
warn!(
"`inverted_index.intermediate_path` is deprecated, use
`index.aux_path` instead. Set `index.aux_path` to {}",
&inverted_index.intermediate_path
)
}
if self.aux_path.is_empty() {
let path = Path::new(data_home).join("index_intermediate");
self.aux_path = path.as_os_str().to_string_lossy().to_string();
}
if self.write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
self.write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
warn!(
"Sanitize index write buffer size to {}",
self.write_buffer_size
);
}
Ok(())
}
}
/// Operational mode for certain actions.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
@@ -280,17 +348,23 @@ pub struct InvertedIndexConfig {
pub create_on_compaction: Mode,
/// Whether to apply the index on query: automatically or never.
pub apply_on_query: Mode,
/// Write buffer size for creating the index.
pub write_buffer_size: ReadableSize,
/// Memory threshold for performing an external sort during index creation.
/// `None` means all sorting will happen in memory.
#[serde_as(as = "NoneAsEmptyString")]
pub mem_threshold_on_create: Option<ReadableSize>,
/// File system path to store intermediate files for external sort, defaults to `{data_home}/index_intermediate`.
#[deprecated = "use [IndexConfig::aux_path] instead"]
#[serde(skip_serializing)]
pub intermediate_path: String,
#[deprecated = "use [IndexConfig::write_buffer_size] instead"]
#[serde(skip_serializing)]
pub write_buffer_size: ReadableSize,
}
impl Default for InvertedIndexConfig {
#[allow(deprecated)]
fn default() -> Self {
Self {
create_on_flush: Mode::Auto,
@@ -303,24 +377,6 @@ impl Default for InvertedIndexConfig {
}
}
impl InvertedIndexConfig {
pub fn sanitize(&mut self, data_home: &str) -> Result<()> {
if self.intermediate_path.is_empty() {
self.intermediate_path = join_dir(data_home, "index_intermediate");
}
if self.write_buffer_size < MULTIPART_UPLOAD_MINIMUM_SIZE {
self.write_buffer_size = MULTIPART_UPLOAD_MINIMUM_SIZE;
warn!(
"Sanitize index write buffer size to {}",
self.write_buffer_size
);
}
Ok(())
}
}
/// Divide cpu num by a non-zero `divisor` and returns at least 1.
fn divide_num_cpus(divisor: usize) -> usize {
debug_assert!(divisor > 0);

View File

@@ -597,13 +597,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Blob type not found, blob_type: {blob_type}"))]
PuffinBlobTypeNotFound {
blob_type: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to write puffin completely"))]
PuffinFinish {
source: puffin::error::Error,
@@ -783,6 +776,20 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to initialize puffin stager"))]
PuffinInitStager {
source: puffin::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to build puffin reader"))]
PuffinBuildReader {
source: puffin::error::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -821,7 +828,6 @@ impl ErrorExt for Error {
| CreateDefault { .. }
| InvalidParquet { .. }
| OperateAbortedIndex { .. }
| PuffinBlobTypeNotFound { .. }
| UnexpectedReplay { .. }
| IndexEncodeNull { .. } => StatusCode::Unexpected,
RegionNotFound { .. } => StatusCode::RegionNotFound,
@@ -886,7 +892,9 @@ impl ErrorExt for Error {
PuffinReadMetadata { source, .. }
| PuffinReadBlob { source, .. }
| PuffinFinish { source, .. }
| PuffinAddBlob { source, .. } => source.status_code(),
| PuffinAddBlob { source, .. }
| PuffinInitStager { source, .. }
| PuffinBuildReader { source, .. } => source.status_code(),
CleanDir { .. } => StatusCode::Unexpected,
InvalidConfig { .. } => StatusCode::InvalidArguments,
StaleLogEntry { .. } => StatusCode::Unexpected,

View File

@@ -327,12 +327,8 @@ impl RegionFlushTask {
.inverted_index
.mem_threshold_on_create
.map(|m| m.as_bytes() as _);
let index_write_buffer_size = Some(
self.engine_config
.inverted_index
.write_buffer_size
.as_bytes() as usize,
);
let index_write_buffer_size =
Some(self.engine_config.index.write_buffer_size.as_bytes() as usize);
// Flush to level 0.
let write_request = SstWriteRequest {

View File

@@ -343,6 +343,7 @@ impl ScanRegion {
.iter()
.copied()
.collect(),
self.access_layer.puffin_manager_factory().clone(),
)
.build(&self.request.filters)
.inspect_err(|err| warn!(err; "Failed to build index applier"))

View File

@@ -48,6 +48,7 @@ use crate::request::OptionOutputTx;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file_purger::LocalFilePurger;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::time_provider::{StdTimeProvider, TimeProviderRef};
use crate::wal::entry_reader::WalEntryReader;
use crate::wal::{EntryId, Wal};
@@ -63,6 +64,7 @@ pub(crate) struct RegionOpener {
options: Option<RegionOptions>,
cache_manager: Option<CacheManagerRef>,
skip_wal_replay: bool,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
time_provider: Option<TimeProviderRef>,
stats: ManifestStats,
@@ -77,6 +79,7 @@ impl RegionOpener {
memtable_builder_provider: MemtableBuilderProvider,
object_store_manager: ObjectStoreManagerRef,
purge_scheduler: SchedulerRef,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
) -> RegionOpener {
RegionOpener {
@@ -89,6 +92,7 @@ impl RegionOpener {
options: None,
cache_manager: None,
skip_wal_replay: false,
puffin_manager_factory,
intermediate_manager,
time_provider: None,
stats: Default::default(),
@@ -216,6 +220,7 @@ impl RegionOpener {
let access_layer = Arc::new(AccessLayer::new(
self.region_dir,
object_store,
self.puffin_manager_factory,
self.intermediate_manager,
));
let time_provider = self
@@ -317,6 +322,7 @@ impl RegionOpener {
let access_layer = Arc::new(AccessLayer::new(
self.region_dir.clone(),
object_store,
self.puffin_manager_factory.clone(),
self.intermediate_manager.clone(),
));
let file_purger = Arc::new(LocalFilePurger::new(

View File

@@ -97,7 +97,6 @@ impl FilePurger for LocalFilePurger {
mod tests {
use common_test_util::temp_dir::create_temp_dir;
use object_store::services::Fs;
use object_store::util::join_dir;
use object_store::ObjectStore;
use smallvec::SmallVec;
@@ -106,6 +105,7 @@ mod tests {
use crate::schedule::scheduler::{LocalScheduler, Scheduler};
use crate::sst::file::{FileHandle, FileId, FileMeta, FileTimeRange, IndexType};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::location;
#[tokio::test]
@@ -119,7 +119,12 @@ mod tests {
let sst_file_id = FileId::random();
let sst_dir = "table1";
let path = location::sst_file_path(sst_dir, sst_file_id);
let intm_mgr = IntermediateManager::init_fs(join_dir(&dir_path, "intm"))
let index_aux_path = dir.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
.await
.unwrap();
@@ -127,7 +132,12 @@ mod tests {
object_store.write(&path, vec![0; 4096]).await.unwrap();
let scheduler = Arc::new(LocalScheduler::new(3));
let layer = Arc::new(AccessLayer::new(sst_dir, object_store.clone(), intm_mgr));
let layer = Arc::new(AccessLayer::new(
sst_dir,
object_store.clone(),
puffin_mgr,
intm_mgr,
));
let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
@@ -165,11 +175,16 @@ mod tests {
builder.root(&dir_path);
let sst_file_id = FileId::random();
let sst_dir = "table1";
let intm_mgr = IntermediateManager::init_fs(join_dir(&dir_path, "intm"))
let index_aux_path = dir.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
.await
.unwrap();
let path = location::sst_file_path(sst_dir, sst_file_id);
let path = location::sst_file_path(sst_dir, sst_file_id);
let object_store = ObjectStore::new(builder).unwrap().finish();
object_store.write(&path, vec![0; 4096]).await.unwrap();
@@ -180,7 +195,12 @@ mod tests {
.unwrap();
let scheduler = Arc::new(LocalScheduler::new(3));
let layer = Arc::new(AccessLayer::new(sst_dir, object_store.clone(), intm_mgr));
let layer = Arc::new(AccessLayer::new(
sst_dir,
object_store.clone(),
puffin_mgr,
intm_mgr,
));
let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));

View File

@@ -16,6 +16,7 @@ pub(crate) mod applier;
mod codec;
pub(crate) mod creator;
pub(crate) mod intermediate;
pub(crate) mod puffin_manager;
mod store;
use std::num::NonZeroUsize;

View File

@@ -16,27 +16,21 @@ pub mod builder;
use std::sync::Arc;
use futures::{AsyncRead, AsyncSeek};
use common_telemetry::warn;
use index::inverted_index::format::reader::InvertedIndexBlobReader;
use index::inverted_index::search::index_apply::{
ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
};
use object_store::ObjectStore;
use puffin::file_format::reader::{AsyncReader, PuffinFileReader};
use snafu::{OptionExt, ResultExt};
use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader};
use snafu::ResultExt;
use store_api::storage::RegionId;
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::error::{
ApplyIndexSnafu, OpenDalSnafu, PuffinBlobTypeNotFoundSnafu, PuffinReadBlobSnafu,
PuffinReadMetadataSnafu, Result,
};
use crate::metrics::{
INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE, INDEX_PUFFIN_READ_BYTES_TOTAL,
INDEX_PUFFIN_READ_OP_TOTAL, INDEX_PUFFIN_SEEK_OP_TOTAL,
};
use crate::error::{ApplyIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result};
use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE};
use crate::sst::file::FileId;
use crate::sst::index::store::InstrumentedStore;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
use crate::sst::index::INDEX_BLOB_TYPE;
use crate::sst::location;
@@ -50,7 +44,7 @@ pub(crate) struct SstIndexApplier {
region_id: RegionId,
/// Store responsible for accessing remote index files.
store: InstrumentedStore,
store: ObjectStore,
/// The cache of index files.
file_cache: Option<FileCacheRef>,
@@ -58,6 +52,9 @@ pub(crate) struct SstIndexApplier {
/// Predefined index applier used to apply predicates to index files
/// and return the relevant row group ids for further scan.
index_applier: Box<dyn IndexApplier>,
/// The puffin manager factory.
puffin_manager_factory: PuffinManagerFactory,
}
pub(crate) type SstIndexApplierRef = Arc<SstIndexApplier>;
@@ -67,18 +64,20 @@ impl SstIndexApplier {
pub fn new(
region_dir: String,
region_id: RegionId,
object_store: ObjectStore,
store: ObjectStore,
file_cache: Option<FileCacheRef>,
index_applier: Box<dyn IndexApplier>,
puffin_manager_factory: PuffinManagerFactory,
) -> Self {
INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64);
Self {
region_dir,
region_id,
store: InstrumentedStore::new(object_store),
store,
file_cache,
index_applier,
puffin_manager_factory,
}
}
@@ -91,94 +90,65 @@ impl SstIndexApplier {
index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
};
match self.cached_puffin_reader(file_id).await? {
Some(mut puffin_reader) => {
let blob_reader = Self::index_blob_reader(&mut puffin_reader).await?;
let mut index_reader = InvertedIndexBlobReader::new(blob_reader);
self.index_applier
.apply(context, &mut index_reader)
.await
.context(ApplyIndexSnafu)
let blob = match self.cached_blob_reader(file_id).await {
Ok(Some(puffin_reader)) => puffin_reader,
other => {
if let Err(err) = other {
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
}
self.remote_blob_reader(file_id).await?
}
None => {
let mut puffin_reader = self.remote_puffin_reader(file_id).await?;
let blob_reader = Self::index_blob_reader(&mut puffin_reader).await?;
let mut index_reader = InvertedIndexBlobReader::new(blob_reader);
self.index_applier
.apply(context, &mut index_reader)
.await
.context(ApplyIndexSnafu)
}
}
};
let mut blob_reader = InvertedIndexBlobReader::new(blob);
let output = self
.index_applier
.apply(context, &mut blob_reader)
.await
.context(ApplyIndexSnafu)?;
Ok(output)
}
/// Helper function to create a [`PuffinFileReader`] from the cached index file.
async fn cached_puffin_reader(
&self,
file_id: FileId,
) -> Result<Option<PuffinFileReader<impl AsyncRead + AsyncSeek>>> {
/// Creates a blob reader from the cached index file.
async fn cached_blob_reader(&self, file_id: FileId) -> Result<Option<BlobReader>> {
let Some(file_cache) = &self.file_cache else {
return Ok(None);
};
let Some(indexed_value) = file_cache
.get(IndexKey::new(self.region_id, file_id, FileType::Puffin))
.await
else {
let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
if file_cache.get(index_key).await.is_none() {
return Ok(None);
};
let Some(reader) = file_cache
.reader(IndexKey::new(self.region_id, file_id, FileType::Puffin))
.await
else {
return Ok(None);
};
let puffin_manager = self.puffin_manager_factory.build(file_cache.local_store());
let puffin_file_name = file_cache.cache_file_path(index_key);
let reader = reader
.into_futures_async_read(0..indexed_value.file_size as u64)
let reader = puffin_manager
.reader(&puffin_file_name)
.await
.context(OpenDalSnafu)?;
Ok(Some(PuffinFileReader::new(reader)))
.context(PuffinBuildReaderSnafu)?
.blob(INDEX_BLOB_TYPE)
.await
.context(PuffinReadBlobSnafu)?
.reader()
.await
.context(PuffinBuildReaderSnafu)?;
Ok(Some(reader))
}
/// Helper function to create a [`PuffinFileReader`] from the remote index file.
async fn remote_puffin_reader(
&self,
file_id: FileId,
) -> Result<PuffinFileReader<impl AsyncRead + AsyncSeek>> {
/// Creates a blob reader from the remote index file.
async fn remote_blob_reader(&self, file_id: FileId) -> Result<BlobReader> {
let puffin_manager = self.puffin_manager_factory.build(self.store.clone());
let file_path = location::index_file_path(&self.region_dir, file_id);
let file_reader = self
.store
.reader(
&file_path,
&INDEX_PUFFIN_READ_BYTES_TOTAL,
&INDEX_PUFFIN_READ_OP_TOTAL,
&INDEX_PUFFIN_SEEK_OP_TOTAL,
)
.await?;
Ok(PuffinFileReader::new(file_reader))
}
/// Helper function to create a [`PuffinBlobReader`] for the index blob of the provided index file reader.
async fn index_blob_reader(
puffin_reader: &mut PuffinFileReader<impl AsyncRead + AsyncSeek + Unpin + Send>,
) -> Result<impl AsyncRead + AsyncSeek + '_> {
let file_meta = puffin_reader
.metadata()
puffin_manager
.reader(&file_path)
.await
.context(PuffinReadMetadataSnafu)?;
let blob_meta = file_meta
.blobs
.iter()
.find(|blob| blob.blob_type == INDEX_BLOB_TYPE)
.context(PuffinBlobTypeNotFoundSnafu {
blob_type: INDEX_BLOB_TYPE,
})?;
puffin_reader
.blob_reader(blob_meta)
.context(PuffinReadBlobSnafu)
.context(PuffinBuildReaderSnafu)?
.blob(INDEX_BLOB_TYPE)
.await
.context(PuffinReadBlobSnafu)?
.reader()
.await
.context(PuffinBuildReaderSnafu)
}
}
@@ -194,35 +164,26 @@ mod tests {
use futures::io::Cursor;
use index::inverted_index::search::index_apply::MockIndexApplier;
use object_store::services::Memory;
use puffin::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter};
use puffin::puffin_manager::PuffinWriter;
use super::*;
use crate::error::Error;
#[tokio::test]
async fn test_index_applier_apply_basic() {
let (_d, puffin_manager_factory) =
PuffinManagerFactory::new_for_test_async("test_index_applier_apply_basic_").await;
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let file_id = FileId::random();
let region_dir = "region_dir".to_string();
let path = location::index_file_path(&region_dir, file_id);
let mut puffin_writer = PuffinFileWriter::new(
object_store
.writer(&path)
.await
.unwrap()
.into_futures_async_write(),
);
puffin_writer
.add_blob(Blob {
blob_type: INDEX_BLOB_TYPE.to_string(),
compressed_data: Cursor::new(vec![]),
properties: Default::default(),
compression_codec: None,
})
let puffin_manager = puffin_manager_factory.build(object_store.clone());
let mut writer = puffin_manager.writer(&path).await.unwrap();
writer
.put_blob(INDEX_BLOB_TYPE, Cursor::new(vec![]), Default::default())
.await
.unwrap();
puffin_writer.finish().await.unwrap();
writer.finish().await.unwrap();
let mut mock_index_applier = MockIndexApplier::new();
mock_index_applier.expect_memory_usage().returning(|| 100);
@@ -240,6 +201,7 @@ mod tests {
object_store,
None,
Box::new(mock_index_applier),
puffin_manager_factory,
);
let output = sst_index_applier.apply(file_id).await.unwrap();
assert_eq!(
@@ -254,28 +216,21 @@ mod tests {
#[tokio::test]
async fn test_index_applier_apply_invalid_blob_type() {
let (_d, puffin_manager_factory) =
PuffinManagerFactory::new_for_test_async("test_index_applier_apply_invalid_blob_type_")
.await;
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let file_id = FileId::random();
let region_dir = "region_dir".to_string();
let path = location::index_file_path(&region_dir, file_id);
let mut puffin_writer = PuffinFileWriter::new(
object_store
.writer(&path)
.await
.unwrap()
.into_futures_async_write(),
);
puffin_writer
.add_blob(Blob {
blob_type: "invalid_blob_type".to_string(),
compressed_data: Cursor::new(vec![]),
properties: Default::default(),
compression_codec: None,
})
let puffin_manager = puffin_manager_factory.build(object_store.clone());
let mut writer = puffin_manager.writer(&path).await.unwrap();
writer
.put_blob("invalid_blob_type", Cursor::new(vec![]), Default::default())
.await
.unwrap();
puffin_writer.finish().await.unwrap();
writer.finish().await.unwrap();
let mut mock_index_applier = MockIndexApplier::new();
mock_index_applier.expect_memory_usage().returning(|| 100);
@@ -287,8 +242,9 @@ mod tests {
object_store,
None,
Box::new(mock_index_applier),
puffin_manager_factory,
);
let res = sst_index_applier.apply(file_id).await;
assert!(matches!(res, Err(Error::PuffinBlobTypeNotFound { .. })));
assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
}
}

View File

@@ -38,6 +38,7 @@ use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnaf
use crate::row_converter::SortField;
use crate::sst::index::applier::SstIndexApplier;
use crate::sst::index::codec::IndexValueCodec;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
/// Constructs an [`SstIndexApplier`] which applies predicates to SST files during scan.
pub(crate) struct SstIndexApplierBuilder<'a> {
@@ -58,6 +59,9 @@ pub(crate) struct SstIndexApplierBuilder<'a> {
/// Stores predicates during traversal on the Expr tree.
output: HashMap<ColumnId, Vec<Predicate>>,
/// The puffin manager factory.
puffin_manager_factory: PuffinManagerFactory,
}
impl<'a> SstIndexApplierBuilder<'a> {
@@ -68,6 +72,7 @@ impl<'a> SstIndexApplierBuilder<'a> {
file_cache: Option<FileCacheRef>,
metadata: &'a RegionMetadata,
ignore_column_ids: HashSet<ColumnId>,
puffin_manager_factory: PuffinManagerFactory,
) -> Self {
Self {
region_dir,
@@ -76,6 +81,7 @@ impl<'a> SstIndexApplierBuilder<'a> {
metadata,
ignore_column_ids,
output: HashMap::default(),
puffin_manager_factory,
}
}
@@ -102,6 +108,7 @@ impl<'a> SstIndexApplierBuilder<'a> {
self.object_store,
self.file_cache,
Box::new(applier.context(BuildIndexApplierSnafu)?),
self.puffin_manager_factory,
)))
}
@@ -306,6 +313,8 @@ mod tests {
#[test]
fn test_collect_and_basic() {
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_and_basic_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
"test".to_string(),
@@ -313,6 +322,7 @@ mod tests {
None,
&metadata,
HashSet::default(),
facotry,
);
let expr = Expr::BinaryExpr(BinaryExpr {

View File

@@ -66,9 +66,11 @@ mod tests {
encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column,
test_object_store, test_region_metadata,
};
use crate::sst::index::puffin_manager::PuffinManagerFactory;
#[test]
fn test_collect_between_basic() {
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_between_basic_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
"test".to_string(),
@@ -76,6 +78,7 @@ mod tests {
None,
&metadata,
HashSet::default(),
facotry,
);
let between = Between {
@@ -108,6 +111,8 @@ mod tests {
#[test]
fn test_collect_between_negated() {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_between_negated_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
"test".to_string(),
@@ -115,6 +120,7 @@ mod tests {
None,
&metadata,
HashSet::default(),
facotry,
);
let between = Between {
@@ -130,6 +136,8 @@ mod tests {
#[test]
fn test_collect_between_field_column() {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_between_field_column_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
"test".to_string(),
@@ -137,6 +145,7 @@ mod tests {
None,
&metadata,
HashSet::default(),
facotry,
);
let between = Between {
@@ -152,6 +161,8 @@ mod tests {
#[test]
fn test_collect_between_type_mismatch() {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_between_type_mismatch_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
"test".to_string(),
@@ -159,6 +170,7 @@ mod tests {
None,
&metadata,
HashSet::default(),
facotry,
);
let between = Between {
@@ -175,6 +187,8 @@ mod tests {
#[test]
fn test_collect_between_nonexistent_column() {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_between_nonexistent_column_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
"test".to_string(),
@@ -182,6 +196,7 @@ mod tests {
None,
&metadata,
HashSet::default(),
facotry,
);
let between = Between {

View File

@@ -138,6 +138,7 @@ mod tests {
encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column,
test_object_store, test_region_metadata,
};
use crate::sst::index::puffin_manager::PuffinManagerFactory;
#[test]
fn test_collect_comparison_basic() {
@@ -224,6 +225,8 @@ mod tests {
),
];
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_comparison_basic_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
"test".to_string(),
@@ -231,6 +234,7 @@ mod tests {
None,
&metadata,
HashSet::default(),
facotry,
);
for ((left, op, right), _) in &cases {
@@ -249,6 +253,8 @@ mod tests {
#[test]
fn test_collect_comparison_type_mismatch() {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_comparison_type_mismatch_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
"test".to_string(),
@@ -256,6 +262,7 @@ mod tests {
None,
&metadata,
HashSet::default(),
facotry,
);
let res = builder.collect_comparison_expr(&tag_column(), &Operator::Lt, &int64_lit(10));
@@ -265,6 +272,8 @@ mod tests {
#[test]
fn test_collect_comparison_field_column() {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_comparison_field_column_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
"test".to_string(),
@@ -272,6 +281,7 @@ mod tests {
None,
&metadata,
HashSet::default(),
facotry,
);
builder
@@ -282,6 +292,8 @@ mod tests {
#[test]
fn test_collect_comparison_nonexistent_column() {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_comparison_nonexistent_column_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
"test".to_string(),
@@ -289,6 +301,7 @@ mod tests {
None,
&metadata,
HashSet::default(),
facotry,
);
let res = builder.collect_comparison_expr(

View File

@@ -128,9 +128,11 @@ mod tests {
encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column,
tag_column2, test_object_store, test_region_metadata,
};
use crate::sst::index::puffin_manager::PuffinManagerFactory;
#[test]
fn test_collect_eq_basic() {
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_eq_basic_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
"test".to_string(),
@@ -138,6 +140,7 @@ mod tests {
None,
&metadata,
HashSet::default(),
facotry,
);
builder
@@ -165,6 +168,8 @@ mod tests {
#[test]
fn test_collect_eq_field_column() {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_eq_field_column_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
"test".to_string(),
@@ -172,6 +177,7 @@ mod tests {
None,
&metadata,
HashSet::default(),
facotry,
);
builder
@@ -182,6 +188,8 @@ mod tests {
#[test]
fn test_collect_eq_nonexistent_column() {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_eq_nonexistent_column_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
"test".to_string(),
@@ -189,6 +197,7 @@ mod tests {
None,
&metadata,
HashSet::default(),
facotry,
);
let res = builder.collect_eq(&nonexistent_column(), &string_lit("abc"));
@@ -198,6 +207,8 @@ mod tests {
#[test]
fn test_collect_eq_type_mismatch() {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_eq_type_mismatch_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
"test".to_string(),
@@ -205,6 +216,7 @@ mod tests {
None,
&metadata,
HashSet::default(),
facotry,
);
let res = builder.collect_eq(&tag_column(), &int64_lit(1));
@@ -214,6 +226,8 @@ mod tests {
#[test]
fn test_collect_or_eq_list_basic() {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_basic_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
"test".to_string(),
@@ -221,6 +235,7 @@ mod tests {
None,
&metadata,
HashSet::default(),
facotry,
);
let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
@@ -269,6 +284,8 @@ mod tests {
#[test]
fn test_collect_or_eq_list_invalid_op() {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_invalid_op_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
"test".to_string(),
@@ -276,6 +293,7 @@ mod tests {
None,
&metadata,
HashSet::default(),
facotry,
);
let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
@@ -303,6 +321,8 @@ mod tests {
#[test]
fn test_collect_or_eq_list_multiple_columns() {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_multiple_columns_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
"test".to_string(),
@@ -310,6 +330,7 @@ mod tests {
None,
&metadata,
HashSet::default(),
facotry,
);
let eq_expr = DfExpr::BinaryExpr(BinaryExpr {

View File

@@ -59,9 +59,11 @@ mod tests {
encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column,
test_object_store, test_region_metadata,
};
use crate::sst::index::puffin_manager::PuffinManagerFactory;
#[test]
fn test_collect_in_list_basic() {
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_in_list_basic_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
"test".to_string(),
@@ -69,6 +71,7 @@ mod tests {
None,
&metadata,
HashSet::default(),
facotry,
);
let in_list = InList {
@@ -91,6 +94,8 @@ mod tests {
#[test]
fn test_collect_in_list_negated() {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_in_list_negated_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
"test".to_string(),
@@ -98,6 +103,7 @@ mod tests {
None,
&metadata,
HashSet::default(),
facotry,
);
let in_list = InList {
@@ -112,6 +118,8 @@ mod tests {
#[test]
fn test_collect_in_list_field_column() {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_in_list_field_column_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
"test".to_string(),
@@ -119,6 +127,7 @@ mod tests {
None,
&metadata,
HashSet::default(),
facotry,
);
let in_list = InList {
@@ -133,6 +142,8 @@ mod tests {
#[test]
fn test_collect_in_list_type_mismatch() {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_in_list_type_mismatch_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
"test".to_string(),
@@ -140,6 +151,7 @@ mod tests {
None,
&metadata,
HashSet::default(),
facotry,
);
let in_list = InList {
@@ -155,6 +167,9 @@ mod tests {
#[test]
fn test_collect_in_list_nonexistent_column() {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_in_list_nonexistent_column_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
"test".to_string(),
@@ -162,6 +177,7 @@ mod tests {
None,
&metadata,
HashSet::default(),
facotry,
);
let in_list = InList {

View File

@@ -53,9 +53,11 @@ mod tests {
field_column, int64_lit, nonexistent_column, string_lit, tag_column, test_object_store,
test_region_metadata,
};
use crate::sst::index::puffin_manager::PuffinManagerFactory;
#[test]
fn test_regex_match_basic() {
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_regex_match_basic_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
"test".to_string(),
@@ -63,6 +65,7 @@ mod tests {
None,
&metadata,
HashSet::default(),
facotry,
);
builder
@@ -81,6 +84,8 @@ mod tests {
#[test]
fn test_regex_match_field_column() {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_regex_match_field_column_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
"test".to_string(),
@@ -88,6 +93,7 @@ mod tests {
None,
&metadata,
HashSet::default(),
facotry,
);
builder
@@ -99,6 +105,8 @@ mod tests {
#[test]
fn test_regex_match_type_mismatch() {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_regex_match_type_mismatch_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
"test".to_string(),
@@ -106,6 +114,7 @@ mod tests {
None,
&metadata,
HashSet::default(),
facotry,
);
builder
@@ -117,6 +126,8 @@ mod tests {
#[test]
fn test_regex_match_type_nonexist_column() {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_regex_match_type_nonexist_column_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
"test".to_string(),
@@ -124,6 +135,7 @@ mod tests {
None,
&metadata,
HashSet::default(),
facotry,
);
let res = builder.collect_regex_match(&nonexistent_column(), &string_lit("abc"));

View File

@@ -332,6 +332,7 @@ mod tests {
use super::*;
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::sst::index::applier::builder::SstIndexApplierBuilder;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::location;
fn mock_object_store() -> ObjectStore {
@@ -403,8 +404,10 @@ mod tests {
}
async fn build_applier_factory(
prefix: &str,
tags: BTreeSet<(&'static str, i32)>,
) -> impl Fn(DfExpr) -> BoxFuture<'static, Vec<usize>> {
let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
let region_dir = "region0".to_string();
let sst_file_id = FileId::random();
let file_path = location::index_file_path(&region_dir, sst_file_id);
@@ -433,12 +436,14 @@ mod tests {
assert_eq!(row_count, tags.len() * segment_row_count);
move |expr| {
let _d = &d;
let applier = SstIndexApplierBuilder::new(
region_dir.clone(),
object_store.clone(),
None,
&region_metadata,
Default::default(),
factory.clone(),
)
.build(&[expr])
.unwrap()
@@ -469,7 +474,7 @@ mod tests {
("abc", 3),
]);
let applier_factory = build_applier_factory(tags).await;
let applier_factory = build_applier_factory("test_create_and_query_get_key_", tags).await;
let expr = col("tag_str").eq(lit("aaa"));
let res = applier_factory(expr).await;
@@ -508,7 +513,7 @@ mod tests {
("abc", 3),
]);
let applier_factory = build_applier_factory(tags).await;
let applier_factory = build_applier_factory("test_create_and_query_range_", tags).await;
let expr = col("tag_str").between(lit("aaa"), lit("aab"));
let res = applier_factory(expr).await;
@@ -541,7 +546,8 @@ mod tests {
("abc", 3),
]);
let applier_factory = build_applier_factory(tags).await;
let applier_factory =
build_applier_factory("test_create_and_query_comparison_", tags).await;
let expr = col("tag_str").lt(lit("aab"));
let res = applier_factory(expr).await;
@@ -600,7 +606,7 @@ mod tests {
("abc", 3),
]);
let applier_factory = build_applier_factory(tags).await;
let applier_factory = build_applier_factory("test_create_and_query_regex_", tags).await;
let expr = binary_expr(col("tag_str"), Operator::RegexMatch, lit(".*"));
let res = applier_factory(expr).await;

View File

@@ -16,6 +16,9 @@ use std::time::{Duration, Instant};
use crate::metrics::{INDEX_CREATE_BYTES_TOTAL, INDEX_CREATE_ELAPSED, INDEX_CREATE_ROWS_TOTAL};
pub(crate) type ByteCount = u64;
pub(crate) type RowCount = usize;
/// Stage of the index creation process.
enum Stage {
Update,
@@ -33,9 +36,9 @@ pub(crate) struct Statistics {
/// Accumulated elapsed time for the cleanup stage.
cleanup_eplased: Duration,
/// Number of rows in the index.
row_count: usize,
row_count: RowCount,
/// Number of bytes in the index.
byte_count: u64,
byte_count: ByteCount,
}
impl Statistics {
@@ -58,12 +61,12 @@ impl Statistics {
}
/// Returns row count.
pub fn row_count(&self) -> usize {
pub fn row_count(&self) -> RowCount {
self.row_count
}
/// Returns byte count.
pub fn byte_count(&self) -> u64 {
pub fn byte_count(&self) -> ByteCount {
self.byte_count
}
}

View File

@@ -33,8 +33,8 @@ pub struct IntermediateManager {
impl IntermediateManager {
/// Create a new `IntermediateManager` with the given root path.
/// It will clean up all garbage intermediate files from previous runs.
pub async fn init_fs(root_path: impl AsRef<str>) -> Result<Self> {
let store = new_fs_object_store(&normalize_dir(root_path.as_ref())).await?;
pub async fn init_fs(aux_path: impl AsRef<str>) -> Result<Self> {
let store = new_fs_object_store(&normalize_dir(aux_path.as_ref())).await?;
let store = InstrumentedStore::new(store);
// Remove all garbage intermediate files from previous runs.

View File

@@ -0,0 +1,207 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::Path;
use std::sync::Arc;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use object_store::{FuturesAsyncReader, FuturesAsyncWriter, ObjectStore};
use puffin::error::{self as puffin_error, Result as PuffinResult};
use puffin::puffin_manager::file_accessor::PuffinFileAccessor;
use puffin::puffin_manager::fs_puffin_manager::FsPuffinManager;
use puffin::puffin_manager::stager::{BoundedStager, FsBlobGuard, FsDirGuard};
use puffin::puffin_manager::BlobGuard;
use snafu::ResultExt;
use crate::error::{PuffinInitStagerSnafu, Result};
use crate::metrics::{
INDEX_PUFFIN_FLUSH_OP_TOTAL, INDEX_PUFFIN_READ_BYTES_TOTAL, INDEX_PUFFIN_READ_OP_TOTAL,
INDEX_PUFFIN_SEEK_OP_TOTAL, INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL,
};
use crate::sst::index::store::{self, InstrumentedStore};
type InstrumentedAsyncRead = store::InstrumentedAsyncRead<'static, FuturesAsyncReader>;
type InstrumentedAsyncWrite = store::InstrumentedAsyncWrite<'static, FuturesAsyncWriter>;
pub(crate) type BlobReader = <Arc<FsBlobGuard> as BlobGuard>::Reader;
pub(crate) type SstPuffinManager = FsPuffinManager<
Arc<FsBlobGuard>,
Arc<FsDirGuard>,
InstrumentedAsyncRead,
InstrumentedAsyncWrite,
>;
const STAGING_DIR: &str = "staging";
/// A factory for creating `SstPuffinManager` instances.
#[derive(Clone)]
pub struct PuffinManagerFactory {
/// The stager used by the puffin manager.
stager: Arc<BoundedStager>,
/// The size of the write buffer used to create object store.
write_buffer_size: Option<usize>,
}
impl PuffinManagerFactory {
/// Creates a new `PuffinManagerFactory` instance.
pub async fn new(
aux_path: impl AsRef<Path>,
staging_capacity: u64,
write_buffer_size: Option<usize>,
) -> Result<Self> {
let staging_dir = aux_path.as_ref().join(STAGING_DIR);
let stager = BoundedStager::new(staging_dir, staging_capacity)
.await
.context(PuffinInitStagerSnafu)?;
Ok(Self {
stager: Arc::new(stager),
write_buffer_size,
})
}
pub(crate) fn build(&self, store: ObjectStore) -> SstPuffinManager {
let store = InstrumentedStore::new(store).with_write_buffer_size(self.write_buffer_size);
let puffin_file_accessor = ObjectStorePuffinFileAccessor::new(store);
SstPuffinManager::new(self.stager.clone(), Arc::new(puffin_file_accessor))
}
}
impl PuffinManagerFactory {
#[cfg(test)]
pub(crate) async fn new_for_test_async(
prefix: &str,
) -> (common_test_util::temp_dir::TempDir, Self) {
let tempdir = common_test_util::temp_dir::create_temp_dir(prefix);
let factory = Self::new(tempdir.path().to_path_buf(), 1024, None)
.await
.unwrap();
(tempdir, factory)
}
#[cfg(test)]
pub(crate) fn new_for_test_block(prefix: &str) -> (common_test_util::temp_dir::TempDir, Self) {
let tempdir = common_test_util::temp_dir::create_temp_dir(prefix);
let f = Self::new(tempdir.path().to_path_buf(), 1024, None);
let factory = common_runtime::block_on_bg(f).unwrap();
(tempdir, factory)
}
}
/// A `PuffinFileAccessor` implementation that uses an object store as the underlying storage.
pub(crate) struct ObjectStorePuffinFileAccessor {
object_store: InstrumentedStore,
}
impl ObjectStorePuffinFileAccessor {
pub fn new(object_store: InstrumentedStore) -> Self {
Self { object_store }
}
}
#[async_trait]
impl PuffinFileAccessor for ObjectStorePuffinFileAccessor {
type Reader = InstrumentedAsyncRead;
type Writer = InstrumentedAsyncWrite;
async fn reader(&self, puffin_file_name: &str) -> PuffinResult<Self::Reader> {
self.object_store
.reader(
puffin_file_name,
&INDEX_PUFFIN_READ_BYTES_TOTAL,
&INDEX_PUFFIN_READ_OP_TOTAL,
&INDEX_PUFFIN_SEEK_OP_TOTAL,
)
.await
.map_err(BoxedError::new)
.context(puffin_error::ExternalSnafu)
}
async fn writer(&self, puffin_file_name: &str) -> PuffinResult<Self::Writer> {
self.object_store
.writer(
puffin_file_name,
&INDEX_PUFFIN_WRITE_BYTES_TOTAL,
&INDEX_PUFFIN_WRITE_OP_TOTAL,
&INDEX_PUFFIN_FLUSH_OP_TOTAL,
)
.await
.map_err(BoxedError::new)
.context(puffin_error::ExternalSnafu)
}
}
#[cfg(test)]
mod tests {
use common_test_util::temp_dir::create_temp_dir;
use futures::io::Cursor;
use futures::AsyncReadExt;
use object_store::services::Memory;
use puffin::blob_metadata::CompressionCodec;
use puffin::puffin_manager::{
BlobGuard, DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions,
};
use super::*;
#[tokio::test]
async fn test_puffin_manager_factory() {
let (_dir, factory) =
PuffinManagerFactory::new_for_test_async("test_puffin_manager_factory_").await;
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let manager = factory.build(object_store);
let file_name = "my-puffin-file";
let blob_key = "blob-key";
let dir_key = "dir-key";
let raw_data = b"hello world!";
let mut writer = manager.writer(file_name).await.unwrap();
writer
.put_blob(blob_key, Cursor::new(raw_data), PutOptions::default())
.await
.unwrap();
let dir_data = create_temp_dir("test_puffin_manager_factory_dir_data_");
tokio::fs::write(dir_data.path().join("hello"), raw_data)
.await
.unwrap();
writer
.put_dir(
dir_key,
dir_data.path().into(),
PutOptions {
compression: Some(CompressionCodec::Zstd),
},
)
.await
.unwrap();
writer.finish().await.unwrap();
let reader = manager.reader(file_name).await.unwrap();
let blob_guard = reader.blob(blob_key).await.unwrap();
let mut blob_reader = blob_guard.reader().await.unwrap();
let mut buf = Vec::new();
blob_reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, raw_data);
let dir_guard = reader.dir(dir_key).await.unwrap();
let file = dir_guard.path().join("hello");
let data = tokio::fs::read(file).await.unwrap();
assert_eq!(data, raw_data);
}
}

View File

@@ -20,7 +20,6 @@ use common_base::Plugins;
use common_datasource::compression::CompressionType;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use object_store::services::Fs;
use object_store::util::join_dir;
use object_store::ObjectStore;
use store_api::metadata::RegionMetadataRef;
use tokio::sync::mpsc::Sender;
@@ -36,6 +35,7 @@ use crate::region::{ManifestContext, ManifestContextRef, RegionState};
use crate::request::WorkerRequest;
use crate::schedule::scheduler::{Job, LocalScheduler, Scheduler, SchedulerRef};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::worker::WorkerListener;
/// Scheduler mocker.
@@ -55,11 +55,20 @@ impl SchedulerEnv {
let mut builder = Fs::default();
builder.root(&path_str);
let intm_mgr = IntermediateManager::init_fs(join_dir(&path_str, "intm"))
let index_aux_path = path.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
.await
.unwrap();
let object_store = ObjectStore::new(builder).unwrap().finish();
let access_layer = Arc::new(AccessLayer::new("", object_store.clone(), intm_mgr));
let access_layer = Arc::new(AccessLayer::new(
"",
object_store.clone(),
puffin_mgr,
intm_mgr,
));
SchedulerEnv {
path,

View File

@@ -58,6 +58,7 @@ use crate::request::{
};
use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::time_provider::{StdTimeProvider, TimeProviderRef};
use crate::wal::Wal;
@@ -132,10 +133,15 @@ impl WorkerGroup {
let write_buffer_manager = Arc::new(WriteBufferManagerImpl::new(
config.global_write_buffer_size.as_bytes() as usize,
));
let intermediate_manager =
IntermediateManager::init_fs(&config.inverted_index.intermediate_path)
.await?
.with_buffer_size(Some(config.inverted_index.write_buffer_size.as_bytes() as _));
let puffin_manager_factory = PuffinManagerFactory::new(
&config.index.aux_path,
config.index.staging_size.as_bytes(),
Some(config.index.write_buffer_size.as_bytes() as _),
)
.await?;
let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
.await?
.with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
// We use another scheduler to avoid purge jobs blocking other jobs.
// A purge job is cheaper than other background jobs so they share the same job limit.
@@ -169,6 +175,7 @@ impl WorkerGroup {
purge_scheduler: purge_scheduler.clone(),
listener: WorkerListener::default(),
cache_manager: cache_manager.clone(),
puffin_manager_factory: puffin_manager_factory.clone(),
intermediate_manager: intermediate_manager.clone(),
time_provider: time_provider.clone(),
flush_sender: flush_sender.clone(),
@@ -261,10 +268,15 @@ impl WorkerGroup {
});
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let intermediate_manager =
IntermediateManager::init_fs(&config.inverted_index.intermediate_path)
.await?
.with_buffer_size(Some(config.inverted_index.write_buffer_size.as_bytes() as _));
let puffin_manager_factory = PuffinManagerFactory::new(
&config.index.aux_path,
config.index.staging_size.as_bytes(),
Some(config.index.write_buffer_size.as_bytes() as _),
)
.await?;
let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
.await?
.with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
let write_cache = write_cache_from_config(
&config,
object_store_manager.clone(),
@@ -292,6 +304,7 @@ impl WorkerGroup {
purge_scheduler: purge_scheduler.clone(),
listener: WorkerListener::new(listener.clone()),
cache_manager: cache_manager.clone(),
puffin_manager_factory: puffin_manager_factory.clone(),
intermediate_manager: intermediate_manager.clone(),
time_provider: time_provider.clone(),
flush_sender: flush_sender.clone(),
@@ -361,6 +374,7 @@ struct WorkerStarter<S> {
purge_scheduler: SchedulerRef,
listener: WorkerListener,
cache_manager: CacheManagerRef,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
time_provider: TimeProviderRef,
/// Watch channel sender to notify workers to handle stalled requests.
@@ -408,6 +422,7 @@ impl<S: LogStore> WorkerStarter<S> {
stalled_requests: StalledRequests::default(),
listener: self.listener,
cache_manager: self.cache_manager,
puffin_manager_factory: self.puffin_manager_factory,
intermediate_manager: self.intermediate_manager,
time_provider: self.time_provider,
last_periodical_check_millis: now,
@@ -586,6 +601,8 @@ struct RegionWorkerLoop<S> {
listener: WorkerListener,
/// Cache.
cache_manager: CacheManagerRef,
/// Puffin manager factory for index.
puffin_manager_factory: PuffinManagerFactory,
/// Intermediate manager for inverted index.
intermediate_manager: IntermediateManager,
/// Provider to get current time.

View File

@@ -54,6 +54,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.memtable_builder_provider.clone(),
self.object_store_manager.clone(),
self.purge_scheduler.clone(),
self.puffin_manager_factory.clone(),
self.intermediate_manager.clone(),
)
.cache(Some(self.cache_manager.clone()))

View File

@@ -61,6 +61,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.memtable_builder_provider.clone(),
self.object_store_manager.clone(),
self.purge_scheduler.clone(),
self.puffin_manager_factory.clone(),
self.intermediate_manager.clone(),
)
.metadata(metadata)

View File

@@ -93,6 +93,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.memtable_builder_provider.clone(),
self.object_store_manager.clone(),
self.purge_scheduler.clone(),
self.puffin_manager_factory.clone(),
self.intermediate_manager.clone(),
)
.skip_wal_replay(request.skip_wal_replay)

View File

@@ -16,7 +16,7 @@ use std::any::Any;
use std::io::Error as IoError;
use std::sync::Arc;
use common_error::ext::ErrorExt;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
@@ -248,6 +248,14 @@ pub enum Error {
#[snafu(display("Get value from cache"))]
CacheGet { source: Arc<Error> },
#[snafu(display("External error"))]
External {
#[snafu(source)]
error: BoxedError,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -287,6 +295,8 @@ impl ErrorExt for Error {
DuplicateBlob { .. } => StatusCode::InvalidArguments,
CacheGet { source } => source.status_code(),
External { error, .. } => error.status_code(),
}
}

View File

@@ -18,7 +18,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use async_trait::async_trait;
pub use bounded_stager::BoundedStager;
pub use bounded_stager::{BoundedStager, FsBlobGuard, FsDirGuard};
use futures::future::BoxFuture;
use futures::AsyncWrite;

View File

@@ -68,6 +68,10 @@ pub struct BoundedStager {
impl BoundedStager {
pub async fn new(base_dir: PathBuf, capacity: u64) -> Result<Self> {
tokio::fs::create_dir_all(&base_dir)
.await
.context(CreateSnafu)?;
let recycle_bin = Cache::builder()
.time_to_live(Duration::from_secs(60))
.build();

View File

@@ -826,13 +826,16 @@ sst_write_buffer_size = "8MiB"
parallel_scan_channel_size = 32
allow_stale_entries = false
[region_engine.mito.index]
aux_path = ""
staging_size = "2GiB"
write_buffer_size = "8MiB"
[region_engine.mito.inverted_index]
create_on_flush = "auto"
create_on_compaction = "auto"
apply_on_query = "auto"
write_buffer_size = "8MiB"
mem_threshold_on_create = "64.0MiB"
intermediate_path = ""
[region_engine.mito.memtable]
type = "time_series"