feat(mito): support write cache for index file (#3144)

* feat(mito): support write cache for index file

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: merge main

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2024-01-12 10:40:56 +08:00
committed by GitHub
parent 0882da4d01
commit c1190bae7b
15 changed files with 277 additions and 105 deletions

View File

@@ -102,7 +102,8 @@ impl AccessLayer {
request: SstWriteRequest,
write_opts: &WriteOptions,
) -> Result<Option<SstInfo>> {
let path = location::sst_file_path(&self.region_dir, request.file_id);
let file_path = location::sst_file_path(&self.region_dir, request.file_id);
let index_file_path = location::index_file_path(&self.region_dir, request.file_id);
let region_id = request.metadata.region_id;
let sst_info = if let Some(write_cache) = request.cache_manager.write_cache() {
@@ -114,7 +115,8 @@ impl AccessLayer {
metadata: request.metadata,
source: request.source,
storage: request.storage,
upload_path: path,
upload_path: file_path,
index_upload_path: index_file_path,
remote_store: self.object_store.clone(),
},
write_opts,
@@ -122,7 +124,8 @@ impl AccessLayer {
.await?
} else {
// Write cache is disabled.
let mut writer = ParquetWriter::new(path, request.metadata, self.object_store.clone());
let mut writer =
ParquetWriter::new(file_path, request.metadata, self.object_store.clone());
writer.write_all(request.source, write_opts).await?
};

View File

@@ -71,7 +71,7 @@ impl FileCache {
// The cache is replaced by another file. This is unexpected, we don't remove the same
// file but updates the metrics as the file is already replaced by users.
CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into());
warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.0);
warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.region_id);
return;
}
@@ -80,7 +80,7 @@ impl FileCache {
CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into());
}
Err(e) => {
warn!(e; "Failed to delete cached file {} for region {}", file_path, key.0);
warn!(e; "Failed to delete cached file {} for region {}", file_path, key.region_id);
}
}
}
@@ -241,7 +241,51 @@ impl FileCache {
}
/// Key of file cache index.
pub(crate) type IndexKey = (RegionId, FileId);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) struct IndexKey {
pub region_id: RegionId,
pub file_id: FileId,
pub file_type: FileType,
}
impl IndexKey {
/// Creates a new index key.
pub fn new(region_id: RegionId, file_id: FileId, file_type: FileType) -> IndexKey {
IndexKey {
region_id,
file_id,
file_type,
}
}
}
/// Type of the file.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum FileType {
/// Parquet file.
Parquet,
/// Puffin file.
Puffin,
}
impl FileType {
/// Parses the file type from string.
fn parse(s: &str) -> Option<FileType> {
match s {
"parquet" => Some(FileType::Parquet),
"puffin" => Some(FileType::Puffin),
_ => None,
}
}
/// Converts the file type to string.
fn as_str(&self) -> &'static str {
match self {
FileType::Parquet => "parquet",
FileType::Puffin => "puffin",
}
}
}
/// An entity that describes the file in the file cache.
///
@@ -254,21 +298,30 @@ pub(crate) struct IndexValue {
/// Generates the path to the cached file.
///
/// The file name format is `{region_id}.{file_id}`
/// The file name format is `{region_id}.{file_id}.{file_type}`
fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String {
join_path(cache_file_dir, &format!("{}.{}", key.0.as_u64(), key.1))
join_path(
cache_file_dir,
&format!(
"{}.{}.{}",
key.region_id.as_u64(),
key.file_id,
key.file_type.as_str()
),
)
}
/// Parse index key from the file name.
fn parse_index_key(name: &str) -> Option<IndexKey> {
let mut splited = name.splitn(2, '.');
let region_id = splited.next().and_then(|s| {
let mut split = name.splitn(3, '.');
let region_id = split.next().and_then(|s| {
let id = s.parse::<u64>().ok()?;
Some(RegionId::from_u64(id))
})?;
let file_id = splited.next().and_then(|s| FileId::parse_str(s).ok())?;
let file_id = split.next().and_then(|s| FileId::parse_str(s).ok())?;
let file_type = split.next().and_then(FileType::parse)?;
Some((region_id, file_id))
Some(IndexKey::new(region_id, file_id, file_type))
}
#[cfg(test)]
@@ -293,7 +346,7 @@ mod tests {
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = (region_id, file_id);
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
let file_path = cache.cache_file_path(key);
// Get an empty file.
@@ -306,7 +359,10 @@ mod tests {
.unwrap();
// Add to the cache.
cache
.put((region_id, file_id), IndexValue { file_size: 5 })
.put(
IndexKey::new(region_id, file_id, FileType::Parquet),
IndexValue { file_size: 5 },
)
.await;
// Read file content.
@@ -339,7 +395,7 @@ mod tests {
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = (region_id, file_id);
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
let file_path = cache.cache_file_path(key);
// Write a file.
@@ -349,7 +405,10 @@ mod tests {
.unwrap();
// Add to the cache.
cache
.put((region_id, file_id), IndexValue { file_size: 5 })
.put(
IndexKey::new(region_id, file_id, FileType::Parquet),
IndexValue { file_size: 5 },
)
.await;
// Remove the file but keep the index.
@@ -368,11 +427,12 @@ mod tests {
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
let region_id = RegionId::new(2000, 0);
let file_type = FileType::Parquet;
// Write N files.
let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect();
let mut total_size = 0;
for (i, file_id) in file_ids.iter().enumerate() {
let key = (region_id, *file_id);
let key = IndexKey::new(region_id, *file_id, file_type);
let file_path = cache.cache_file_path(key);
let bytes = i.to_string().into_bytes();
local_store.write(&file_path, bytes.clone()).await.unwrap();
@@ -380,7 +440,7 @@ mod tests {
// Add to the cache.
cache
.put(
(region_id, *file_id),
IndexKey::new(region_id, *file_id, file_type),
IndexValue {
file_size: bytes.len() as u32,
},
@@ -392,7 +452,10 @@ mod tests {
// Recover the cache.
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
// No entry before recovery.
assert!(cache.reader((region_id, file_ids[0])).await.is_none());
assert!(cache
.reader(IndexKey::new(region_id, file_ids[0], file_type))
.await
.is_none());
cache.recover().await.unwrap();
// Check size.
@@ -400,7 +463,7 @@ mod tests {
assert_eq!(total_size, cache.memory_index.weighted_size() as usize);
for (i, file_id) in file_ids.iter().enumerate() {
let key = (region_id, *file_id);
let key = IndexKey::new(region_id, *file_id, file_type);
let mut reader = cache.reader(key).await.unwrap();
let mut buf = String::new();
reader.read_to_string(&mut buf).await.unwrap();
@@ -415,7 +478,7 @@ mod tests {
let file_cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = (region_id, file_id);
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
let file_path = file_cache.cache_file_path(key);
// Write a file.
let data = b"hello greptime database";
@@ -424,9 +487,7 @@ mod tests {
.await
.unwrap();
// Add to the cache.
file_cache
.put((region_id, file_id), IndexValue { file_size: 5 })
.await;
file_cache.put(key, IndexValue { file_size: 5 }).await;
// Ranges
let ranges = vec![0..5, 6..10, 15..19, 0..data.len() as u64];
let bytes = file_cache.read_ranges(key, &ranges).await.unwrap();
@@ -442,12 +503,18 @@ mod tests {
fn test_cache_file_path() {
let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
assert_eq!(
"test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095",
cache_file_path("test_dir", (RegionId::new(1234, 5), file_id))
"test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
cache_file_path(
"test_dir",
IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
)
);
assert_eq!(
"test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095",
cache_file_path("test_dir/", (RegionId::new(1234, 5), file_id))
"test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
cache_file_path(
"test_dir/",
IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
)
);
}
@@ -456,8 +523,8 @@ mod tests {
let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
let region_id = RegionId::new(1234, 5);
assert_eq!(
(region_id, file_id),
parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").unwrap()
IndexKey::new(region_id, file_id, FileType::Parquet),
parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap()
);
assert!(parse_index_key("").is_none());
assert!(parse_index_key(".").is_none());
@@ -466,8 +533,13 @@ mod tests {
assert!(parse_index_key(".5299989643269").is_none());
assert!(parse_index_key("5299989643269.").is_none());
assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df").is_none());
assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").is_none());
assert!(
parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").is_none()
parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parque").is_none()
);
assert!(parse_index_key(
"5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet.puffin"
)
.is_none());
}
}

View File

@@ -25,10 +25,10 @@ use object_store::manager::ObjectStoreManagerRef;
use object_store::ObjectStore;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use super::file_cache::IndexKey;
use crate::access_layer::new_fs_object_store;
use crate::cache::file_cache::{FileCache, FileCacheRef, IndexValue};
use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
use crate::error::{self, Result};
use crate::metrics::{FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL};
use crate::read::Source;
@@ -78,6 +78,11 @@ impl WriteCache {
Self::new(local_store, object_store_manager, cache_capacity).await
}
/// Returns the file cache of the write cache.
pub(crate) fn file_cache(&self) -> FileCacheRef {
self.file_cache.clone()
}
/// Writes SST to the cache and then uploads it to the remote object store.
pub async fn write_and_upload_sst(
&self,
@@ -90,11 +95,11 @@ impl WriteCache {
let region_id = request.metadata.region_id;
let file_id = request.file_id;
let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet);
let cache_path = self.file_cache.cache_file_path((region_id, file_id));
// Write to FileCache.
let mut writer = ParquetWriter::new(
cache_path.clone(),
self.file_cache.cache_file_path(parquet_key),
request.metadata,
self.file_cache.local_store(),
);
@@ -109,8 +114,36 @@ impl WriteCache {
return Ok(None);
};
let parquet_path = &request.upload_path;
let remote_store = &request.remote_store;
self.upload(parquet_key, parquet_path, remote_store).await?;
if sst_info.inverted_index_available {
let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin);
let puffin_path = &request.index_upload_path;
self.upload(puffin_key, puffin_path, remote_store).await?;
}
Ok(Some(sst_info))
}
/// Uploads a Parquet file or a Puffin file to the remote object store.
async fn upload(
&self,
index_key: IndexKey,
upload_path: &str,
remote_store: &ObjectStore,
) -> Result<()> {
let region_id = index_key.region_id;
let file_id = index_key.file_id;
let file_type = index_key.file_type;
let cache_path = self.file_cache.cache_file_path(index_key);
let timer = FLUSH_ELAPSED
.with_label_values(&["upload_sst"])
.with_label_values(&[match file_type {
FileType::Parquet => "upload_parquet",
FileType::Puffin => "upload_puffin",
}])
.start_timer();
let reader = self
@@ -120,17 +153,20 @@ impl WriteCache {
.await
.context(error::OpenDalSnafu)?;
let upload_path = request.upload_path;
let mut writer = request
.remote_store
.writer_with(&upload_path)
let mut writer = remote_store
.writer_with(upload_path)
.buffer(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
.await
.context(error::OpenDalSnafu)?;
let bytes_written = futures::io::copy(reader, &mut writer)
.await
.context(error::UploadSstSnafu { region_id, file_id })?;
let bytes_written =
futures::io::copy(reader, &mut writer)
.await
.context(error::UploadSnafu {
region_id,
file_id,
file_type,
})?;
// Must close to upload all data.
writer.close().await.context(error::OpenDalSnafu)?;
@@ -145,18 +181,13 @@ impl WriteCache {
timer.stop_and_record()
);
let index_value = IndexValue {
file_size: bytes_written as _,
};
// Register to file cache
let file_size = sst_info.file_size as u32;
self.file_cache
.put((region_id, file_id), IndexValue { file_size })
.await;
self.file_cache.put(index_key, index_value).await;
Ok(Some(sst_info))
}
/// Returns the file cache of the write cache.
pub(crate) fn file_cache(&self) -> FileCacheRef {
self.file_cache.clone()
Ok(())
}
}
@@ -168,6 +199,8 @@ pub struct SstUploadRequest {
pub storage: Option<String>,
/// Path to upload the file.
pub upload_path: String,
/// Path to upload the index file.
pub index_upload_path: String,
/// Remote object store to upload.
pub remote_store: ObjectStore,
}
@@ -186,7 +219,7 @@ mod tests {
use crate::cache::file_cache::{self, FileCache};
use crate::cache::test_util::new_fs_store;
use crate::sst::file::FileId;
use crate::sst::location::sst_file_path;
use crate::sst::location::{index_file_path, sst_file_path};
use crate::test_util::sst_util::{
new_batch_by_range, new_source, sst_file_handle, sst_region_metadata,
};
@@ -200,6 +233,7 @@ mod tests {
let mock_store = env.init_object_store_manager();
let file_id = FileId::random();
let upload_path = sst_file_path("test", file_id);
let index_upload_path = index_file_path("test", file_id);
// Create WriteCache
let local_dir = create_temp_dir("");
@@ -228,6 +262,7 @@ mod tests {
source,
storage: None,
upload_path: upload_path.clone(),
index_upload_path,
remote_store: mock_store.clone(),
};
@@ -244,7 +279,7 @@ mod tests {
.unwrap();
// Check write cache contains the key
let key = (region_id, file_id);
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
assert!(write_cache.file_cache.contains_key(&key));
// Check file data

View File

@@ -28,6 +28,7 @@ use snafu::{Location, Snafu};
use store_api::manifest::ManifestVersion;
use store_api::storage::RegionId;
use crate::cache::file_cache::FileType;
use crate::sst::file::FileId;
use crate::worker::WorkerId;
@@ -522,13 +523,15 @@ pub enum Error {
},
#[snafu(display(
"Failed to upload sst file, region_id: {}, file_id: {}",
"Failed to upload file, region_id: {}, file_id: {}, file_type: {:?}",
region_id,
file_id
file_id,
file_type,
))]
UploadSst {
Upload {
region_id: RegionId,
file_id: FileId,
file_type: FileType,
#[snafu(source)]
error: std::io::Error,
location: Location,
@@ -629,7 +632,7 @@ impl ErrorExt for Error {
CleanDir { .. } => StatusCode::Unexpected,
InvalidConfig { .. } => StatusCode::InvalidArguments,
StaleLogEntry { .. } => StatusCode::Unexpected,
UploadSst { .. } => StatusCode::StorageUnavailable,
Upload { .. } => StatusCode::StorageUnavailable,
}
}

View File

@@ -23,6 +23,7 @@ use store_api::storage::ScanRequest;
use table::predicate::{Predicate, TimeRangePredicateBuilder};
use crate::access_layer::AccessLayerRef;
use crate::cache::file_cache::FileCacheRef;
use crate::cache::CacheManagerRef;
use crate::error::Result;
use crate::read::projection::ProjectionMapper;
@@ -233,9 +234,17 @@ impl ScanRegion {
/// Use the latest schema to build the index applier.
fn build_index_applier(&self) -> Option<SstIndexApplierRef> {
let file_cache = || -> Option<FileCacheRef> {
let cache_manager = self.cache_manager.as_ref()?;
let write_cache = cache_manager.write_cache()?;
let file_cache = write_cache.file_cache();
Some(file_cache)
}();
SstIndexApplierBuilder::new(
self.access_layer.region_dir().to_string(),
self.access_layer.object_store().clone(),
file_cache,
self.version.metadata.as_ref(),
)
.build(&self.request.filters)

View File

@@ -224,6 +224,7 @@ impl SeqScan {
.time_range(self.time_range)
.projection(Some(self.mapper.column_ids().to_vec()))
.cache(self.cache_manager.clone())
.index_applier(self.index_applier.clone())
.build()
.await;
let reader = match maybe_reader {

View File

@@ -25,7 +25,9 @@ use index::inverted_index::search::index_apply::{
use object_store::ObjectStore;
use puffin::file_format::reader::{PuffinAsyncReader, PuffinFileReader};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::error::{
ApplyIndexSnafu, PuffinBlobTypeNotFoundSnafu, PuffinReadBlobSnafu, PuffinReadMetadataSnafu,
Result,
@@ -41,34 +43,42 @@ use crate::sst::location;
/// The [`SstIndexApplier`] is responsible for applying predicates to the provided SST files
/// and returning the relevant row group ids for further scan.
pub struct SstIndexApplier {
pub(crate) struct SstIndexApplier {
/// The root directory of the region.
region_dir: String,
/// Store responsible for accessing SST files.
/// Region ID.
region_id: RegionId,
/// Store responsible for accessing remote index files.
store: InstrumentedStore,
/// The cache of index files.
file_cache: Option<FileCacheRef>,
/// Predefined index applier used to apply predicates to index files
/// and return the relevant row group ids for further scan.
index_applier: Box<dyn IndexApplier>,
}
pub type SstIndexApplierRef = Arc<SstIndexApplier>;
pub(crate) type SstIndexApplierRef = Arc<SstIndexApplier>;
impl SstIndexApplier {
/// Creates a new [`SstIndexApplier`].
///
/// TODO(zhongzc): leverage `WriteCache`
pub fn new(
region_dir: String,
region_id: RegionId,
object_store: ObjectStore,
file_cache: Option<FileCacheRef>,
index_applier: Box<dyn IndexApplier>,
) -> Self {
INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64);
Self {
region_dir,
region_id,
store: InstrumentedStore::new(object_store),
file_cache,
index_applier,
}
}
@@ -77,22 +87,49 @@ impl SstIndexApplier {
pub async fn apply(&self, file_id: FileId) -> Result<BTreeSet<usize>> {
let _timer = INDEX_APPLY_ELAPSED.start_timer();
let mut puffin_reader = self.puffin_reader(file_id).await?;
let blob_reader = Self::index_blob_reader(&mut puffin_reader).await?;
let mut index_reader = InvertedIndexBlobReader::new(blob_reader);
let context = SearchContext {
// Encountering a non-existing column indicates that it doesn't match predicates.
index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
};
self.index_applier
.apply(context, &mut index_reader)
.await
.context(ApplyIndexSnafu)
match self.cached_puffin_reader(file_id).await? {
Some(mut puffin_reader) => {
let blob_reader = Self::index_blob_reader(&mut puffin_reader).await?;
let mut index_reader = InvertedIndexBlobReader::new(blob_reader);
self.index_applier
.apply(context, &mut index_reader)
.await
.context(ApplyIndexSnafu)
}
None => {
let mut puffin_reader = self.remote_puffin_reader(file_id).await?;
let blob_reader = Self::index_blob_reader(&mut puffin_reader).await?;
let mut index_reader = InvertedIndexBlobReader::new(blob_reader);
self.index_applier
.apply(context, &mut index_reader)
.await
.context(ApplyIndexSnafu)
}
}
}
/// Helper function to create a [`PuffinFileReader`] for the provided SST file id.
async fn puffin_reader(
/// Helper function to create a [`PuffinFileReader`] from the cached index file.
async fn cached_puffin_reader(
&self,
file_id: FileId,
) -> Result<Option<PuffinFileReader<impl AsyncRead + AsyncSeek>>> {
let Some(file_cache) = &self.file_cache else {
return Ok(None);
};
Ok(file_cache
.reader(IndexKey::new(self.region_id, file_id, FileType::Puffin))
.await
.map(PuffinFileReader::new))
}
/// Helper function to create a [`PuffinFileReader`] from the remote index file.
async fn remote_puffin_reader(
&self,
file_id: FileId,
) -> Result<PuffinFileReader<impl AsyncRead + AsyncSeek>> {
@@ -172,7 +209,9 @@ mod tests {
let sst_index_applier = SstIndexApplier::new(
region_dir.clone(),
RegionId::new(0, 0),
object_store,
None,
Box::new(mock_index_applier),
);
let ids = sst_index_applier.apply(file_id).await.unwrap();
@@ -203,7 +242,9 @@ mod tests {
let sst_index_applier = SstIndexApplier::new(
region_dir.clone(),
RegionId::new(0, 0),
object_store,
None,
Box::new(mock_index_applier),
);
let res = sst_index_applier.apply(file_id).await;

View File

@@ -34,19 +34,23 @@ use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;
use store_api::storage::ColumnId;
use crate::cache::file_cache::FileCacheRef;
use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Result};
use crate::row_converter::SortField;
use crate::sst::index::applier::SstIndexApplier;
use crate::sst::index::codec::IndexValueCodec;
/// Constructs an [`SstIndexApplier`] which applies predicates to SST files during scan.
pub struct SstIndexApplierBuilder<'a> {
pub(crate) struct SstIndexApplierBuilder<'a> {
/// Directory of the region, required argument for constructing [`SstIndexApplier`].
region_dir: String,
/// Object store, required argument for constructing [`SstIndexApplier`].
object_store: ObjectStore,
/// File cache, required argument for constructing [`SstIndexApplier`].
file_cache: Option<FileCacheRef>,
/// Metadata of the region, used to get metadata like column type.
metadata: &'a RegionMetadata,
@@ -59,11 +63,13 @@ impl<'a> SstIndexApplierBuilder<'a> {
pub fn new(
region_dir: String,
object_store: ObjectStore,
file_cache: Option<FileCacheRef>,
metadata: &'a RegionMetadata,
) -> Self {
Self {
region_dir,
object_store,
file_cache,
metadata,
output: HashMap::default(),
}
@@ -88,7 +94,9 @@ impl<'a> SstIndexApplierBuilder<'a> {
let applier = PredicatesIndexApplier::try_from(predicates);
Ok(Some(SstIndexApplier::new(
self.region_dir,
self.metadata.region_id,
self.object_store,
self.file_cache,
Box::new(applier.context(BuildIndexApplierSnafu)?),
)))
}
@@ -286,7 +294,7 @@ mod tests {
fn test_collect_and_basic() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata);
let expr = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(DfExpr::BinaryExpr(BinaryExpr {

View File

@@ -69,7 +69,7 @@ mod tests {
fn test_collect_between_basic() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata);
let between = Between {
negated: false,
@@ -103,7 +103,7 @@ mod tests {
fn test_collect_between_negated() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata);
let between = Between {
negated: true,
@@ -120,7 +120,7 @@ mod tests {
fn test_collect_between_field_column() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata);
let between = Between {
negated: false,
@@ -137,7 +137,7 @@ mod tests {
fn test_collect_between_type_mismatch() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata);
let between = Between {
negated: false,
@@ -155,7 +155,7 @@ mod tests {
fn test_collect_between_nonexistent_column() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata);
let between = Between {
negated: false,

View File

@@ -224,7 +224,7 @@ mod tests {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata);
for ((left, op, right), _) in &cases {
builder.collect_comparison_expr(left, op, right).unwrap();
@@ -244,7 +244,7 @@ mod tests {
fn test_collect_comparison_type_mismatch() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata);
let res = builder.collect_comparison_expr(&tag_column(), &Operator::Lt, &int64_lit(10));
assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
@@ -255,7 +255,7 @@ mod tests {
fn test_collect_comparison_field_column() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata);
builder
.collect_comparison_expr(&field_column(), &Operator::Lt, &string_lit("abc"))
@@ -267,7 +267,7 @@ mod tests {
fn test_collect_comparison_nonexistent_column() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata);
let res = builder.collect_comparison_expr(
&nonexistent_column(),

View File

@@ -133,7 +133,7 @@ mod tests {
fn test_collect_eq_basic() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata);
builder
.collect_eq(&tag_column(), &string_lit("foo"))
@@ -162,7 +162,7 @@ mod tests {
fn test_collect_eq_field_column() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata);
builder
.collect_eq(&field_column(), &string_lit("abc"))
@@ -174,7 +174,7 @@ mod tests {
fn test_collect_eq_nonexistent_column() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata);
let res = builder.collect_eq(&nonexistent_column(), &string_lit("abc"));
assert!(matches!(res, Err(Error::ColumnNotFound { .. })));
@@ -185,7 +185,7 @@ mod tests {
fn test_collect_eq_type_mismatch() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata);
let res = builder.collect_eq(&tag_column(), &int64_lit(1));
assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
@@ -196,7 +196,7 @@ mod tests {
fn test_collect_or_eq_list_basic() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata);
let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(tag_column()),
@@ -246,7 +246,7 @@ mod tests {
fn test_collect_or_eq_list_invalid_op() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata);
let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(tag_column()),
@@ -275,7 +275,7 @@ mod tests {
fn test_collect_or_eq_list_multiple_columns() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata);
let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(tag_column()),

View File

@@ -64,7 +64,7 @@ mod tests {
fn test_collect_in_list_basic() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata);
let in_list = InList {
expr: Box::new(tag_column()),
@@ -88,7 +88,7 @@ mod tests {
fn test_collect_in_list_negated() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata);
let in_list = InList {
expr: Box::new(tag_column()),
@@ -104,7 +104,7 @@ mod tests {
fn test_collect_in_list_field_column() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata);
let in_list = InList {
expr: Box::new(field_column()),
@@ -120,7 +120,7 @@ mod tests {
fn test_collect_in_list_type_mismatch() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata);
let in_list = InList {
expr: Box::new(tag_column()),
@@ -137,7 +137,7 @@ mod tests {
fn test_collect_in_list_nonexistent_column() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata);
let in_list = InList {
expr: Box::new(nonexistent_column()),

View File

@@ -56,7 +56,7 @@ mod tests {
fn test_regex_match_basic() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata);
builder
.collect_regex_match(&tag_column(), &string_lit("abc"))
@@ -76,7 +76,7 @@ mod tests {
fn test_regex_match_field_column() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata);
builder
.collect_regex_match(&field_column(), &string_lit("abc"))
@@ -89,7 +89,7 @@ mod tests {
fn test_regex_match_type_mismatch() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata);
builder
.collect_regex_match(&tag_column(), &int64_lit(123))
@@ -102,7 +102,7 @@ mod tests {
fn test_regex_match_type_nonexist_column() {
let metadata = test_region_metadata();
let mut builder =
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata);
SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata);
let res = builder.collect_regex_match(&nonexistent_column(), &string_lit("abc"));
assert!(matches!(res, Err(Error::ColumnNotFound { .. })));

View File

@@ -49,7 +49,7 @@ use crate::sst::parquet::stats::RowGroupPruningStats;
use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
/// Parquet SST reader builder.
pub struct ParquetReaderBuilder {
pub(crate) struct ParquetReaderBuilder {
/// SST directory.
file_dir: String,
file_handle: FileHandle,

View File

@@ -29,11 +29,11 @@ use parquet::file::serialized_reader::SerializedPageReader;
use parquet::format::PageLocation;
use store_api::storage::RegionId;
use super::helper::fetch_byte_ranges;
use crate::cache::file_cache::IndexKey;
use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::{CacheManagerRef, PageKey, PageValue};
use crate::metrics::READ_STAGE_ELAPSED;
use crate::sst::file::FileId;
use crate::sst::parquet::helper::fetch_byte_ranges;
use crate::sst::parquet::page_reader::CachedPageReader;
/// An in-memory collection of column chunks
@@ -228,7 +228,7 @@ impl<'a> InMemoryRowGroup<'a> {
/// Try to fetch data from WriteCache,
/// if not in WriteCache, fetch data from object store directly.
async fn fetch_bytes(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
let key = (self.region_id, self.file_id);
let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet);
match self.fetch_ranges_from_write_cache(key, ranges).await {
Some(data) => Ok(data),
None => {