Merge branch 'main' into chore/test_skip_auth

This commit is contained in:
Lanqing Yang
2025-12-12 07:48:43 +09:00
committed by GitHub
12 changed files with 198 additions and 78 deletions

View File

@@ -564,7 +564,7 @@ fn new_noop_file_purger() -> FilePurgerRef {
#[derive(Debug)]
struct Noop;
impl FilePurger for Noop {
fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool) {}
fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) {}
}
Arc::new(Noop)
}

View File

@@ -47,21 +47,16 @@ pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2;
pub const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1);
/// The keep-alive interval of the heartbeat channel.
pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS: Duration =
Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1);
pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS: Duration = Duration::from_secs(15);
/// The keep-alive timeout of the heartbeat channel.
pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS: Duration =
Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1);
pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS: Duration = Duration::from_secs(5);
/// The default options for the etcd client.
pub fn default_etcd_client_options() -> ConnectOptions {
ConnectOptions::new()
.with_keep_alive_while_idle(true)
.with_keep_alive(
Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1),
Duration::from_secs(10),
)
.with_keep_alive(Duration::from_secs(15), Duration::from_secs(5))
.with_connect_timeout(Duration::from_secs(10))
}

View File

@@ -228,19 +228,28 @@ impl AccessLayer {
// Delete all versions of the index file.
for version in 0..=index_file_id.version {
let index_id = RegionIndexId::new(*region_file_id, version);
self.delete_index(index_id).await?;
}
Ok(())
}
pub(crate) async fn delete_index(
&self,
index_file_id: RegionIndexId,
) -> Result<(), crate::error::Error> {
let path = location::index_file_path(
&self.table_dir,
RegionIndexId::new(index_file_id.file_id, version),
RegionIndexId::new(index_file_id.file_id, index_file_id.version),
self.path_type,
);
self.object_store
.delete(&path)
.await
.context(DeleteIndexSnafu {
file_id: region_file_id.file_id(),
file_id: index_file_id.file_id(),
})?;
}
Ok(())
}

View File

@@ -21,11 +21,10 @@ use async_trait::async_trait;
use bytes::Bytes;
use index::bloom_filter::error::Result;
use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader};
use store_api::storage::{ColumnId, FileId};
use store_api::storage::{ColumnId, FileId, IndexVersion};
use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
use crate::metrics::{CACHE_HIT, CACHE_MISS};
use crate::sst::file::IndexVersion;
const INDEX_TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";

View File

@@ -22,11 +22,10 @@ use bytes::Bytes;
use index::inverted_index::error::Result;
use index::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
use prost::Message;
use store_api::storage::FileId;
use store_api::storage::{FileId, IndexVersion};
use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
use crate::metrics::{CACHE_HIT, CACHE_MISS};
use crate::sst::file::IndexVersion;
const INDEX_TYPE_INVERTED_INDEX: &str = "inverted_index";

View File

@@ -28,7 +28,7 @@ use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::PathType;
use store_api::storage::{ColumnId, FileId, RegionId};
use store_api::storage::{ColumnId, FileId, IndexVersion, RegionId};
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
@@ -82,8 +82,6 @@ pub type Level = u8;
pub const MAX_LEVEL: Level = 2;
/// Type to store index types for a column.
pub type IndexTypes = SmallVec<[IndexType; 4]>;
/// Index version
pub type IndexVersion = u64;
/// Cross-region file id.
///
@@ -308,6 +306,11 @@ impl FileMeta {
!self.available_indexes.is_empty()
}
/// Whether the index file is up-to-date comparing to another file meta.
pub fn is_index_up_to_date(&self, other: &FileMeta) -> bool {
self.exists_index() && other.exists_index() && self.index_version >= other.index_version
}
/// Returns true if the file has an inverted index
pub fn inverted_index_available(&self) -> bool {
self.available_indexes.contains(&IndexType::InvertedIndex)
@@ -434,6 +437,16 @@ impl FileHandle {
self.inner.compacting.store(compacting, Ordering::Relaxed);
}
pub fn index_outdated(&self) -> bool {
self.inner.index_outdated.load(Ordering::Relaxed)
}
pub fn set_index_outdated(&self, index_outdated: bool) {
self.inner
.index_outdated
.store(index_outdated, Ordering::Relaxed);
}
/// Returns a reference to the [FileMeta].
pub fn meta_ref(&self) -> &FileMeta {
&self.inner.meta
@@ -471,23 +484,29 @@ struct FileHandleInner {
meta: FileMeta,
compacting: AtomicBool,
deleted: AtomicBool,
index_outdated: AtomicBool,
file_purger: FilePurgerRef,
}
impl Drop for FileHandleInner {
fn drop(&mut self) {
self.file_purger
.remove_file(self.meta.clone(), self.deleted.load(Ordering::Relaxed));
self.file_purger.remove_file(
self.meta.clone(),
self.deleted.load(Ordering::Acquire),
self.index_outdated.load(Ordering::Acquire),
);
}
}
impl FileHandleInner {
/// There should only be one `FileHandleInner` for each file on a datanode
fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner {
file_purger.new_file(&meta);
FileHandleInner {
meta,
compacting: AtomicBool::new(false),
deleted: AtomicBool::new(false),
index_outdated: AtomicBool::new(false),
file_purger,
}
}
@@ -540,21 +559,62 @@ pub async fn delete_files(
);
for (file_id, index_version) in file_ids {
purge_index_cache_stager(
region_id,
delete_index,
access_layer,
cache_manager,
*file_id,
*index_version,
)
.await;
}
Ok(())
}
pub async fn delete_index(
region_index_id: RegionIndexId,
access_layer: &AccessLayerRef,
cache_manager: &Option<CacheManagerRef>,
) -> crate::error::Result<()> {
access_layer.delete_index(region_index_id).await?;
purge_index_cache_stager(
region_index_id.region_id(),
true,
access_layer,
cache_manager,
region_index_id.file_id(),
region_index_id.version,
)
.await;
Ok(())
}
async fn purge_index_cache_stager(
region_id: RegionId,
delete_index: bool,
access_layer: &AccessLayerRef,
cache_manager: &Option<CacheManagerRef>,
file_id: FileId,
index_version: u64,
) {
if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) {
// Removes index file from the cache.
if delete_index {
write_cache
.remove(IndexKey::new(
region_id,
*file_id,
FileType::Puffin(*index_version),
file_id,
FileType::Puffin(index_version),
))
.await;
}
// Remove the SST file from the cache.
write_cache
.remove(IndexKey::new(region_id, *file_id, FileType::Parquet))
.remove(IndexKey::new(region_id, file_id, FileType::Parquet))
.await;
}
@@ -562,8 +622,8 @@ pub async fn delete_files(
if let Err(e) = access_layer
.puffin_manager_factory()
.purge_stager(RegionIndexId::new(
RegionFileId::new(region_id, *file_id),
*index_version,
RegionFileId::new(region_id, file_id),
index_version,
))
.await
{
@@ -571,8 +631,6 @@ pub async fn delete_files(
file_id, index_version, region_id);
}
}
Ok(())
}
#[cfg(test)]
mod tests {

View File

@@ -21,7 +21,7 @@ use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
use crate::error::Result;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file::{FileMeta, delete_files};
use crate::sst::file::{FileMeta, delete_files, delete_index};
use crate::sst::file_ref::FileReferenceManagerRef;
/// A worker to delete files in background.
@@ -29,7 +29,8 @@ pub trait FilePurger: Send + Sync + fmt::Debug {
/// Send a request to remove the file.
/// If `is_delete` is true, the file will be deleted from the storage.
/// Otherwise, only the reference will be removed.
fn remove_file(&self, file_meta: FileMeta, is_delete: bool);
/// If `index_outdated` is true, the index file will be deleted regardless of `is_delete`.
fn remove_file(&self, file_meta: FileMeta, is_delete: bool, index_outdated: bool);
/// Notify the purger of a new file created.
/// This is useful for object store based storage, where we need to track the file references
@@ -46,7 +47,7 @@ pub type FilePurgerRef = Arc<dyn FilePurger>;
pub struct NoopFilePurger;
impl FilePurger for NoopFilePurger {
fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool) {
fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) {
// noop
}
}
@@ -142,12 +143,27 @@ impl LocalFilePurger {
error!(e; "Failed to schedule the file purge request");
}
}
fn delete_index(&self, file_meta: FileMeta) {
let sst_layer = self.sst_layer.clone();
let cache_manager = self.cache_manager.clone();
if let Err(e) = self.scheduler.schedule(Box::pin(async move {
let index_id = file_meta.index_id();
if let Err(e) = delete_index(index_id, &sst_layer, &cache_manager).await {
error!(e; "Failed to delete index for file {:?} from storage", file_meta);
}
})) {
error!(e; "Failed to schedule the index purge request");
}
}
}
impl FilePurger for LocalFilePurger {
fn remove_file(&self, file_meta: FileMeta, is_delete: bool) {
fn remove_file(&self, file_meta: FileMeta, is_delete: bool, index_outdated: bool) {
if is_delete {
self.delete_file(file_meta);
} else if index_outdated {
self.delete_index(file_meta);
}
}
}
@@ -158,7 +174,7 @@ pub struct ObjectStoreFilePurger {
}
impl FilePurger for ObjectStoreFilePurger {
fn remove_file(&self, file_meta: FileMeta, _is_delete: bool) {
fn remove_file(&self, file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) {
// if not on local file system, instead inform the global file purger to remove the file reference.
// notice that no matter whether the file is deleted or not, we need to remove the reference
// because the file is no longer in use nonetheless.

View File

@@ -503,6 +503,8 @@ pub type ResultMpscSender = Sender<Result<IndexBuildOutcome>>;
#[derive(Clone)]
pub struct IndexBuildTask {
/// The SST file handle to build index for.
pub file: FileHandle,
/// The file meta to build index for.
pub file_meta: FileMeta,
pub reason: IndexBuildType,
@@ -651,10 +653,7 @@ impl IndexBuildTask {
let mut parquet_reader = self
.access_layer
.read_sst(FileHandle::new(
self.file_meta.clone(),
self.file_purger.clone(),
))
.read_sst(self.file.clone()) // use the latest file handle instead of creating a new one
.build()
.await?;
@@ -1498,14 +1497,19 @@ mod tests {
let region_id = metadata.region_id;
let indexer_builder = mock_indexer_builder(metadata, &env).await;
// Create mock task.
let task = IndexBuildTask {
file_meta: FileMeta {
let file_meta = FileMeta {
region_id,
file_id: FileId::random(),
file_size: 100,
..Default::default()
},
};
let file = FileHandle::new(file_meta.clone(), file_purger.clone());
// Create mock task.
let task = IndexBuildTask {
file,
file_meta,
reason: IndexBuildType::Flush,
access_layer: env.access_layer.clone(),
listener: WorkerListener::default(),
@@ -1555,10 +1559,13 @@ mod tests {
mock_version_control(metadata.clone(), file_purger.clone(), files).await;
let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
let file = FileHandle::new(file_meta.clone(), file_purger.clone());
// Create mock task.
let (tx, mut rx) = mpsc::channel(4);
let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
let task = IndexBuildTask {
file,
file_meta: file_meta.clone(),
reason: IndexBuildType::Flush,
access_layer: env.access_layer.clone(),
@@ -1626,10 +1633,13 @@ mod tests {
mock_version_control(metadata.clone(), file_purger.clone(), files).await;
let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
let file = FileHandle::new(file_meta.clone(), file_purger.clone());
// Create mock task.
let (tx, _rx) = mpsc::channel(4);
let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
let task = IndexBuildTask {
file,
file_meta: file_meta.clone(),
reason: IndexBuildType::Flush,
access_layer: env.access_layer.clone(),
@@ -1726,10 +1736,13 @@ mod tests {
mock_version_control(metadata.clone(), file_purger.clone(), files).await;
let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
let file = FileHandle::new(file_meta.clone(), file_purger.clone());
// Create mock task.
let (tx, mut rx) = mpsc::channel(4);
let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
let task = IndexBuildTask {
file,
file_meta: file_meta.clone(),
reason: IndexBuildType::Flush,
access_layer: env.access_layer.clone(),
@@ -1813,10 +1826,13 @@ mod tests {
let version_control =
mock_version_control(metadata.clone(), file_purger.clone(), files).await;
let file = FileHandle::new(file_meta.clone(), file_purger.clone());
// Create mock task.
let (tx, mut _rx) = mpsc::channel(4);
let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
let task = IndexBuildTask {
file,
file_meta: file_meta.clone(),
reason: IndexBuildType::Flush,
access_layer: env.access_layer.clone(),
@@ -1864,13 +1880,18 @@ mod tests {
let (tx, _rx) = mpsc::channel(4);
let (result_tx, _result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
IndexBuildTask {
file_meta: FileMeta {
let file_meta = FileMeta {
region_id,
file_id,
file_size: 100,
..Default::default()
},
};
let file = FileHandle::new(file_meta.clone(), file_purger.clone());
IndexBuildTask {
file,
file_meta,
reason,
access_layer: env.access_layer.clone(),
listener: WorkerListener::default(),

View File

@@ -57,9 +57,28 @@ impl SstVersion {
) {
for file in files_to_add {
let level = file.level;
let new_index_version = file.index_version;
// If the file already exists, then we should only replace the handle when the index is outdated.
self.levels[level as usize]
.files
.insert(file.file_id, FileHandle::new(file, file_purger.clone()));
.entry(file.file_id)
.and_modify(|f| {
if *f.meta_ref() == file || f.meta_ref().is_index_up_to_date(&file) {
// same file meta or current file handle's index is up-to-date, skip adding
if f.index_id().version > new_index_version {
// what does it mean for us to see older index version?
common_telemetry::warn!(
"Adding file with older index version, existing: {:?}, new: {:?}, ignoring new file",
f.meta_ref(),
file
);
}
} else {
// include case like old file have no index or index is outdated
*f = FileHandle::new(file.clone(), file_purger.clone());
}
})
.or_insert_with(|| FileHandle::new(file.clone(), file_purger.clone()));
}
}

View File

@@ -72,6 +72,7 @@ impl<S> RegionWorkerLoop<S> {
});
IndexBuildTask {
file: file.clone(),
file_meta: file.meta_ref().clone(),
reason: build_type,
access_layer: access_layer.clone(),

View File

@@ -26,6 +26,6 @@ pub use datatypes::schema::{
};
pub use self::descriptors::*;
pub use self::file::{FileId, FileRef, FileRefsManifest, GcReport, ParseIdError};
pub use self::file::{FileId, FileRef, FileRefsManifest, GcReport, IndexVersion, ParseIdError};
pub use self::requests::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
pub use self::types::{SequenceNumber, SequenceRange};

View File

@@ -24,6 +24,9 @@ use uuid::Uuid;
use crate::ManifestVersion;
use crate::storage::RegionId;
/// Index version
pub type IndexVersion = u64;
#[derive(Debug, Snafu, PartialEq)]
pub struct ParseIdError {
source: uuid::Error,