feat(mito): enable inverted index (#3158)

* feat(mito): enable inverted index

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix typos

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix typos

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* accidentally resolved the incorrect filtering issue within the Metric Engine

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix test

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* Update src/mito2/src/access_layer.rs

* Update src/mito2/src/test_util/scheduler_util.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* fix: format -> join_dir

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: move intermediate_manager from arg of write_and_upload_sst to field of WriteCache

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: add IndexerBuidler

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix clippy

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
Zhenchi
2024-01-15 17:08:07 +08:00
committed by GitHub
parent 816d94892c
commit 6f07d69155
34 changed files with 916 additions and 235 deletions

View File

@@ -116,6 +116,25 @@ parallel_scan_channel_size = 32
# Whether to allow stale WAL entries read during replay.
allow_stale_entries = false
[region_engine.mito.inverted_index]
# Whether to create the index on flush.
# - "auto": automatically
# - "disable": never
create_on_flush = "auto"
# Whether to create the index on compaction.
# - "auto": automatically
# - "disable": never
create_on_compaction = "auto"
# Whether to apply the index on query
# - "auto": automatically
# - "disable": never
apply_on_query = "auto"
# Memory threshold for performing an external sort during index creation.
# Setting to empty will disable external sorting, forcing all sorting operations to happen in memory.
mem_threshold_on_create = "64MB"
# File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`).
intermediate_path = ""
# Log options, see `standalone.example.toml`
# [logging]
# dir = "/tmp/greptimedb/logs"

View File

@@ -216,6 +216,25 @@ parallel_scan_channel_size = 32
# Whether to allow stale WAL entries read during replay.
allow_stale_entries = false
[region_engine.mito.inverted_index]
# Whether to create the index on flush.
# - "auto": automatically
# - "disable": never
create_on_flush = "auto"
# Whether to create the index on compaction.
# - "auto": automatically
# - "disable": never
create_on_compaction = "auto"
# Whether to apply the index on query
# - "auto": automatically
# - "disable": never
apply_on_query = "auto"
# Memory threshold for performing an external sort during index creation.
# Setting to empty will disable external sorting, forcing all sorting operations to happen in memory.
mem_threshold_on_create = "64M"
# File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`).
intermediate_path = ""
# Log options
# [logging]
# Specify logs directory.

View File

@@ -284,6 +284,7 @@ impl DatanodeOptions {
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub enum RegionEngineConfig {
#[serde(rename = "mito")]

View File

@@ -41,7 +41,7 @@ use metric_engine::engine::MetricEngine;
use mito2::config::MitoConfig;
use mito2::engine::MitoEngine;
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::util::{join_dir, normalize_dir};
use object_store::util::normalize_dir;
use query::QueryEngineFactory;
use servers::export_metrics::ExportMetricsTask;
use servers::server::{start_server, ServerHandlers};
@@ -374,19 +374,11 @@ impl DatanodeBuilder {
async fn build_mito_engine(
opts: &DatanodeOptions,
object_store_manager: ObjectStoreManagerRef,
mut config: MitoConfig,
config: MitoConfig,
) -> Result<MitoEngine> {
// Sets write cache path if it is empty.
if config.experimental_write_cache_path.is_empty() {
config.experimental_write_cache_path = join_dir(&opts.storage.data_home, "write_cache");
info!(
"Sets write cache path to {}",
config.experimental_write_cache_path
);
}
let mito_engine = match &opts.wal {
WalConfig::RaftEngine(raft_engine_config) => MitoEngine::new(
&opts.storage.data_home,
config,
Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
.await?,
@@ -394,7 +386,9 @@ impl DatanodeBuilder {
)
.await
.context(BuildMitoEngineSnafu)?,
WalConfig::Kafka(kafka_config) => MitoEngine::new(
&opts.storage.data_home,
config,
Self::build_kafka_log_store(kafka_config).await?,
object_store_manager,

View File

@@ -23,7 +23,7 @@ use crate::inverted_index::BytesRef;
/// `InvertedIndexCreator` provides functionality to construct an inverted index
#[async_trait]
pub trait InvertedIndexCreator {
pub trait InvertedIndexCreator: Send {
/// Adds a value to the named index. A `None` value represents an absence of data (null)
///
/// - `index_name`: Identifier for the index being built

View File

@@ -13,7 +13,5 @@
// limitations under the License.
#![feature(iter_partition_in_place)]
// TODO(zhongzc): remove once further code is added
#![allow(dead_code)]
pub mod inverted_index;

View File

@@ -25,6 +25,8 @@ use crate::cache::CacheManagerRef;
use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
use crate::read::Source;
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::IndexerBuilder;
use crate::sst::location;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
@@ -37,6 +39,8 @@ pub struct AccessLayer {
region_dir: String,
/// Target object store.
object_store: ObjectStore,
/// Intermediate manager for inverted index.
intermediate_manager: IntermediateManager,
}
impl std::fmt::Debug for AccessLayer {
@@ -49,10 +53,15 @@ impl std::fmt::Debug for AccessLayer {
impl AccessLayer {
/// Returns a new [AccessLayer] for specific `region_dir`.
pub fn new(region_dir: impl Into<String>, object_store: ObjectStore) -> AccessLayer {
pub fn new(
region_dir: impl Into<String>,
object_store: ObjectStore,
intermediate_manager: IntermediateManager,
) -> AccessLayer {
AccessLayer {
region_dir: region_dir.into(),
object_store,
intermediate_manager,
}
}
@@ -105,16 +114,15 @@ impl AccessLayer {
let file_path = location::sst_file_path(&self.region_dir, request.file_id);
let index_file_path = location::index_file_path(&self.region_dir, request.file_id);
let region_id = request.metadata.region_id;
let file_id = request.file_id;
let cache_manager = request.cache_manager.clone();
let sst_info = if let Some(write_cache) = request.cache_manager.write_cache() {
let sst_info = if let Some(write_cache) = cache_manager.write_cache() {
// Write to the write cache.
write_cache
.write_and_upload_sst(
request,
SstUploadRequest {
file_id: request.file_id,
metadata: request.metadata,
source: request.source,
storage: request.storage,
upload_path: file_path,
index_upload_path: index_file_path,
remote_store: self.object_store.clone(),
@@ -124,19 +132,30 @@ impl AccessLayer {
.await?
} else {
// Write cache is disabled.
let mut writer =
ParquetWriter::new(file_path, request.metadata, self.object_store.clone());
let indexer = IndexerBuilder {
create_inverted_index: request.create_inverted_index,
mem_threshold_index_create: request.mem_threshold_index_create,
file_id,
file_path: index_file_path,
metadata: &request.metadata,
row_group_size: write_opts.row_group_size,
object_store: self.object_store.clone(),
intermediate_manager: self.intermediate_manager.clone(),
}
.build();
let mut writer = ParquetWriter::new(
file_path,
request.metadata,
self.object_store.clone(),
indexer,
);
writer.write_all(request.source, write_opts).await?
};
// Put parquet metadata to cache manager.
if let Some(sst_info) = &sst_info {
if let Some(parquet_metadata) = &sst_info.file_metadata {
request.cache_manager.put_parquet_meta_data(
region_id,
request.file_id,
parquet_metadata.clone(),
)
cache_manager.put_parquet_meta_data(region_id, file_id, parquet_metadata.clone())
}
}
@@ -150,7 +169,12 @@ pub(crate) struct SstWriteRequest {
pub(crate) metadata: RegionMetadataRef,
pub(crate) source: Source,
pub(crate) cache_manager: CacheManagerRef,
#[allow(dead_code)]
pub(crate) storage: Option<String>,
/// Whether to create inverted index.
pub(crate) create_inverted_index: bool,
/// The threshold of memory size to create inverted index.
pub(crate) mem_threshold_index_create: Option<usize>,
}
/// Creates a fs object store with atomic write dir.

View File

@@ -27,12 +27,14 @@ use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use crate::access_layer::new_fs_object_store;
use crate::access_layer::{new_fs_object_store, SstWriteRequest};
use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
use crate::error::{self, Result};
use crate::metrics::{FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL};
use crate::read::Source;
use crate::sst::file::FileId;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::{Indexer, IndexerBuilder};
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::parquet::{SstInfo, WriteOptions};
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
@@ -45,6 +47,8 @@ pub struct WriteCache {
file_cache: FileCacheRef,
/// Object store manager.
object_store_manager: ObjectStoreManagerRef,
/// Intermediate manager for inverted index.
intermediate_manager: IntermediateManager,
}
pub type WriteCacheRef = Arc<WriteCache>;
@@ -56,6 +60,7 @@ impl WriteCache {
local_store: ObjectStore,
object_store_manager: ObjectStoreManagerRef,
cache_capacity: ReadableSize,
intermediate_manager: IntermediateManager,
) -> Result<Self> {
let file_cache = FileCache::new(local_store, cache_capacity);
file_cache.recover().await?;
@@ -63,6 +68,7 @@ impl WriteCache {
Ok(Self {
file_cache: Arc::new(file_cache),
object_store_manager,
intermediate_manager,
})
}
@@ -71,11 +77,18 @@ impl WriteCache {
cache_dir: &str,
object_store_manager: ObjectStoreManagerRef,
cache_capacity: ReadableSize,
intermediate_manager: IntermediateManager,
) -> Result<Self> {
info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");
let local_store = new_fs_object_store(cache_dir).await?;
Self::new(local_store, object_store_manager, cache_capacity).await
Self::new(
local_store,
object_store_manager,
cache_capacity,
intermediate_manager,
)
.await
}
/// Returns the file cache of the write cache.
@@ -84,27 +97,42 @@ impl WriteCache {
}
/// Writes SST to the cache and then uploads it to the remote object store.
pub async fn write_and_upload_sst(
pub(crate) async fn write_and_upload_sst(
&self,
request: SstUploadRequest,
write_request: SstWriteRequest,
upload_request: SstUploadRequest,
write_opts: &WriteOptions,
) -> Result<Option<SstInfo>> {
let timer = FLUSH_ELAPSED
.with_label_values(&["write_sst"])
.start_timer();
let region_id = request.metadata.region_id;
let file_id = request.file_id;
let region_id = write_request.metadata.region_id;
let file_id = write_request.file_id;
let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet);
let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin);
let indexer = IndexerBuilder {
create_inverted_index: write_request.create_inverted_index,
mem_threshold_index_create: write_request.mem_threshold_index_create,
file_id,
file_path: self.file_cache.cache_file_path(puffin_key),
metadata: &write_request.metadata,
row_group_size: write_opts.row_group_size,
object_store: self.file_cache.local_store(),
intermediate_manager: self.intermediate_manager.clone(),
}
.build();
// Write to FileCache.
let mut writer = ParquetWriter::new(
self.file_cache.cache_file_path(parquet_key),
request.metadata,
write_request.metadata,
self.file_cache.local_store(),
indexer,
);
let sst_info = writer.write_all(request.source, write_opts).await?;
let sst_info = writer.write_all(write_request.source, write_opts).await?;
timer.stop_and_record();
@@ -114,13 +142,13 @@ impl WriteCache {
return Ok(None);
};
let parquet_path = &request.upload_path;
let remote_store = &request.remote_store;
let parquet_path = &upload_request.upload_path;
let remote_store = &upload_request.remote_store;
self.upload(parquet_key, parquet_path, remote_store).await?;
if sst_info.inverted_index_available {
let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin);
let puffin_path = &request.index_upload_path;
let puffin_path = &upload_request.index_upload_path;
self.upload(puffin_key, puffin_path, remote_store).await?;
}
@@ -193,10 +221,6 @@ impl WriteCache {
/// Request to write and upload a SST.
pub struct SstUploadRequest {
pub file_id: FileId,
pub metadata: RegionMetadataRef,
pub source: Source,
pub storage: Option<String>,
/// Path to upload the file.
pub upload_path: String,
/// Path to upload the index file.
@@ -212,6 +236,7 @@ mod tests {
use common_test_util::temp_dir::create_temp_dir;
use object_store::manager::ObjectStoreManager;
use object_store::services::Fs;
use object_store::util::join_dir;
use object_store::ObjectStore;
use store_api::storage::RegionId;
@@ -230,10 +255,14 @@ mod tests {
// TODO(QuenKar): maybe find a way to create some object server for testing,
// and now just use local file system to mock.
let mut env = TestEnv::new();
let data_home = env.data_home().display().to_string();
let mock_store = env.init_object_store_manager();
let file_id = FileId::random();
let upload_path = sst_file_path("test", file_id);
let index_upload_path = index_file_path("test", file_id);
let intm_mgr = IntermediateManager::init_fs(join_dir(&data_home, "intm"))
.await
.unwrap();
// Create WriteCache
let local_dir = create_temp_dir("");
@@ -243,6 +272,7 @@ mod tests {
local_store.clone(),
object_store_manager,
ReadableSize::mb(10),
intm_mgr,
)
.await
.unwrap();
@@ -256,13 +286,19 @@ mod tests {
new_batch_by_range(&["b", "h"], 100, 200),
]);
let request = SstUploadRequest {
let write_request = SstWriteRequest {
file_id,
metadata,
source,
storage: None,
create_inverted_index: true,
mem_threshold_index_create: None,
cache_manager: Default::default(),
};
let request = SstUploadRequest {
upload_path: upload_path.clone(),
index_upload_path,
index_upload_path: index_upload_path.clone(),
remote_store: mock_store.clone(),
};
@@ -273,7 +309,7 @@ mod tests {
// Write to cache and upload sst to mock remote store
let sst_info = write_cache
.write_and_upload_sst(request, &write_opts)
.write_and_upload_sst(write_request, request, &write_opts)
.await
.unwrap()
.unwrap();
@@ -289,5 +325,16 @@ mod tests {
.await
.unwrap();
assert_eq!(remote_data, cache_data);
// Check write cache contains the index key
let index_key = IndexKey::new(region_id, file_id, FileType::Puffin);
assert!(write_cache.file_cache.contains_key(&index_key));
let remote_index_data = mock_store.read(&index_upload_path).await.unwrap();
let cache_index_data = local_store
.read(&write_cache.file_cache.cache_file_path(index_key))
.await
.unwrap();
assert_eq!(remote_index_data, cache_index_data);
}
}

View File

@@ -21,7 +21,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, error};
pub use picker::CompactionPickerRef;
use snafu::ResultExt;
@@ -44,6 +43,7 @@ use crate::sst::file_purger::FilePurgerRef;
/// Region compaction request.
pub struct CompactionRequest {
pub(crate) engine_config: Arc<MitoConfig>,
pub(crate) current_version: VersionRef,
pub(crate) access_layer: AccessLayerRef,
/// Sender to send notification to the region worker.
@@ -53,8 +53,6 @@ pub struct CompactionRequest {
pub(crate) file_purger: FilePurgerRef,
/// Start time of compaction task.
pub(crate) start_time: Instant,
/// Buffering threshold while writing SST files.
pub(crate) sst_write_buffer_size: ReadableSize,
pub(crate) cache_manager: CacheManagerRef,
}
@@ -331,13 +329,13 @@ impl CompactionStatus {
let current_version = self.version_control.current().version;
let start_time = Instant::now();
let mut req = CompactionRequest {
engine_config,
current_version,
access_layer: self.access_layer.clone(),
request_sender: request_sender.clone(),
waiters: Vec::new(),
file_purger: self.file_purger.clone(),
start_time,
sst_write_buffer_size: engine_config.sst_write_buffer_size,
cache_manager,
};
@@ -363,7 +361,7 @@ mod tests {
#[tokio::test]
async fn test_schedule_empty() {
let env = SchedulerEnv::new();
let env = SchedulerEnv::new().await;
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let mut builder = VersionControlBuilder::new();
@@ -432,7 +430,7 @@ mod tests {
#[tokio::test]
async fn test_schedule_on_finished() {
let job_scheduler = Arc::new(VecScheduler::default());
let env = SchedulerEnv::new().scheduler(job_scheduler.clone());
let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let mut builder = VersionControlBuilder::new();

View File

@@ -17,7 +17,6 @@ use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::time::{Duration, Instant};
use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, error, info};
use common_time::timestamp::TimeUnit;
use common_time::timestamp_millis::BucketAligned;
@@ -32,6 +31,7 @@ use crate::access_layer::{AccessLayerRef, SstWriteRequest};
use crate::cache::CacheManagerRef;
use crate::compaction::picker::{CompactionTask, Picker};
use crate::compaction::CompactionRequest;
use crate::config::MitoConfig;
use crate::error::{self, CompactRegionSnafu};
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
use crate::read::projection::ProjectionMapper;
@@ -123,13 +123,13 @@ impl TwcsPicker {
impl Picker for TwcsPicker {
fn pick(&self, req: CompactionRequest) -> Option<Box<dyn CompactionTask>> {
let CompactionRequest {
engine_config,
current_version,
access_layer,
request_sender,
waiters,
file_purger,
start_time,
sst_write_buffer_size,
cache_manager,
} = req;
@@ -173,12 +173,12 @@ impl Picker for TwcsPicker {
return None;
}
let task = TwcsCompactionTask {
engine_config,
region_id,
metadata: region_metadata,
sst_layer: access_layer,
outputs,
expired_ssts,
sst_write_buffer_size,
compaction_time_window: Some(time_window_size),
request_sender,
waiters,
@@ -234,12 +234,12 @@ fn find_latest_window_in_seconds<'a>(
}
pub(crate) struct TwcsCompactionTask {
pub engine_config: Arc<MitoConfig>,
pub region_id: RegionId,
pub metadata: RegionMetadataRef,
pub sst_layer: AccessLayerRef,
pub outputs: Vec<CompactionOutput>,
pub expired_ssts: Vec<FileHandle>,
pub sst_write_buffer_size: ReadableSize,
pub compaction_time_window: Option<i64>,
pub file_purger: FilePurgerRef,
/// Request sender to notify the worker.
@@ -301,9 +301,20 @@ impl TwcsCompactionTask {
);
let write_opts = WriteOptions {
write_buffer_size: self.sst_write_buffer_size,
write_buffer_size: self.engine_config.sst_write_buffer_size,
..Default::default()
};
let create_inverted_index = self
.engine_config
.inverted_index
.create_on_compaction
.auto();
let mem_threshold_index_create = self
.engine_config
.inverted_index
.mem_threshold_on_create
.map(|m| m.as_bytes() as _);
let metadata = self.metadata.clone();
let sst_layer = self.sst_layer.clone();
let region_id = self.region_id;
@@ -321,6 +332,8 @@ impl TwcsCompactionTask {
source: Source::Reader(reader),
cache_manager,
storage,
create_inverted_index,
mem_threshold_index_create,
},
&write_opts,
)

View File

@@ -18,10 +18,11 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_telemetry::warn;
use object_store::util::join_dir;
use serde::{Deserialize, Serialize};
use snafu::ensure;
use serde_with::{serde_as, NoneAsEmptyString};
use crate::error::{InvalidConfigSnafu, Result};
use crate::error::Result;
/// Default max running background job.
const DEFAULT_MAX_BG_JOB: usize = 4;
@@ -72,7 +73,7 @@ pub struct MitoConfig {
pub page_cache_size: ReadableSize,
/// Whether to enable the experimental write cache.
pub enable_experimental_write_cache: bool,
/// Path for write cache.
/// File system path for write cache, defaults to `{data_home}/write_cache`.
pub experimental_write_cache_path: String,
/// Capacity for write cache.
pub experimental_write_cache_size: ReadableSize,
@@ -89,6 +90,9 @@ pub struct MitoConfig {
pub parallel_scan_channel_size: usize,
/// Whether to allow stale entries read during replay.
pub allow_stale_entries: bool,
/// Inverted index configs.
pub inverted_index: InvertedIndexConfig,
}
impl Default for MitoConfig {
@@ -113,6 +117,7 @@ impl Default for MitoConfig {
scan_parallelism: divide_num_cpus(4),
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
allow_stale_entries: false,
inverted_index: InvertedIndexConfig::default(),
}
}
}
@@ -121,7 +126,7 @@ impl MitoConfig {
/// Sanitize incorrect configurations.
///
/// Returns an error if there is a configuration that unable to sanitize.
pub(crate) fn sanitize(&mut self) -> Result<()> {
pub(crate) fn sanitize(&mut self, data_home: &str) -> Result<()> {
// Use default value if `num_workers` is 0.
if self.num_workers == 0 {
self.num_workers = divide_num_cpus(2);
@@ -167,13 +172,75 @@ impl MitoConfig {
);
}
if self.enable_experimental_write_cache {
ensure!(
!self.experimental_write_cache_path.is_empty(),
InvalidConfigSnafu {
reason: "experimental_write_cache_path should not be empty",
}
);
// Sets write cache path if it is empty.
if self.experimental_write_cache_path.is_empty() {
self.experimental_write_cache_path = join_dir(data_home, "write_cache");
}
self.inverted_index.sanitize(data_home)?;
Ok(())
}
}
/// Operational mode for certain actions.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum Mode {
/// The action is performed automatically based on internal criteria.
#[default]
Auto,
/// The action is explicitly disabled.
Disable,
}
impl Mode {
/// Whether the action is disabled.
pub fn disabled(&self) -> bool {
matches!(self, Mode::Disable)
}
/// Whether the action is automatic.
pub fn auto(&self) -> bool {
matches!(self, Mode::Auto)
}
}
/// Configuration options for the inverted index.
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
pub struct InvertedIndexConfig {
/// Whether to create the index on flush: automatically or never.
pub create_on_flush: Mode,
/// Whether to create the index on compaction: automatically or never.
pub create_on_compaction: Mode,
/// Whether to apply the index on query: automatically or never.
pub apply_on_query: Mode,
/// Memory threshold for performing an external sort during index creation.
/// `None` means all sorting will happen in memory.
#[serde_as(as = "NoneAsEmptyString")]
pub mem_threshold_on_create: Option<ReadableSize>,
/// File system path to store intermediate files for external sort, defaults to `{data_home}/index_intermediate`.
pub intermediate_path: String,
}
impl Default for InvertedIndexConfig {
fn default() -> Self {
Self {
create_on_flush: Mode::Auto,
create_on_compaction: Mode::Auto,
apply_on_query: Mode::Auto,
mem_threshold_on_create: Some(ReadableSize::mb(64)),
intermediate_path: String::new(),
}
}
}
impl InvertedIndexConfig {
pub fn sanitize(&mut self, data_home: &str) -> Result<()> {
if self.intermediate_path.is_empty() {
self.intermediate_path = join_dir(data_home, "index_intermediate");
}
Ok(())

View File

@@ -78,11 +78,12 @@ pub struct MitoEngine {
impl MitoEngine {
/// Returns a new [MitoEngine] with specific `config`, `log_store` and `object_store`.
pub async fn new<S: LogStore>(
data_home: &str,
mut config: MitoConfig,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
) -> Result<MitoEngine> {
config.sanitize()?;
config.sanitize(data_home)?;
Ok(MitoEngine {
inner: Arc::new(EngineInner::new(config, log_store, object_store_manager).await?),
@@ -192,7 +193,8 @@ impl EngineInner {
request,
Some(cache_manager),
)
.with_parallelism(scan_parallelism);
.with_parallelism(scan_parallelism)
.ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled());
scan_region.scanner()
}
@@ -315,13 +317,14 @@ impl RegionEngine for MitoEngine {
impl MitoEngine {
/// Returns a new [MitoEngine] for tests.
pub async fn new_for_test<S: LogStore>(
data_home: &str,
mut config: MitoConfig,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
listener: Option<crate::engine::listener::EventListenerRef>,
) -> Result<MitoEngine> {
config.sanitize()?;
config.sanitize(data_home)?;
let config = Arc::new(config);
Ok(MitoEngine {

View File

@@ -550,8 +550,8 @@ async fn test_region_usage() {
let region_stat = region.region_usage().await;
assert_eq!(region_stat.wal_usage, 0);
assert_eq!(region_stat.sst_usage, 2742);
assert_eq!(region_stat.sst_usage, 3006);
// region total usage
assert_eq!(region_stat.disk_usage(), 3791);
assert_eq!(region_stat.disk_usage(), 4072);
}

View File

@@ -315,6 +315,12 @@ impl RegionFlushTask {
let file_id = FileId::random();
let iter = mem.iter(None, None);
let source = Source::Iter(iter);
let create_inverted_index = self.engine_config.inverted_index.create_on_flush.auto();
let mem_threshold_index_create = self
.engine_config
.inverted_index
.mem_threshold_on_create
.map(|m| m.as_bytes() as _);
// Flush to level 0.
let write_request = SstWriteRequest {
@@ -323,6 +329,8 @@ impl RegionFlushTask {
source,
cache_manager: self.cache_manager.clone(),
storage: version.options.storage.clone(),
create_inverted_index,
mem_threshold_index_create,
};
let Some(sst_info) = self
.access_layer
@@ -732,7 +740,7 @@ mod tests {
#[tokio::test]
async fn test_schedule_empty() {
let env = SchedulerEnv::new();
let env = SchedulerEnv::new().await;
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_flush_scheduler();
let builder = VersionControlBuilder::new();

View File

@@ -122,6 +122,8 @@ pub(crate) struct ScanRegion {
cache_manager: Option<CacheManagerRef>,
/// Parallelism to scan.
parallelism: ScanParallism,
/// Whether to ignore inverted index.
ignore_inverted_index: bool,
}
impl ScanRegion {
@@ -138,6 +140,7 @@ impl ScanRegion {
request,
cache_manager,
parallelism: ScanParallism::default(),
ignore_inverted_index: false,
}
}
@@ -148,6 +151,12 @@ impl ScanRegion {
self
}
#[must_use]
pub(crate) fn ignore_inverted_index(mut self, ignore: bool) -> Self {
self.ignore_inverted_index = ignore;
self
}
/// Returns a [Scanner] to scan the region.
pub(crate) fn scanner(self) -> Result<Scanner> {
self.seq_scan().map(Scanner::Seq)
@@ -234,6 +243,10 @@ impl ScanRegion {
/// Use the latest schema to build the index applier.
fn build_index_applier(&self) -> Option<SstIndexApplierRef> {
if self.ignore_inverted_index {
return None;
}
let file_cache = || -> Option<FileCacheRef> {
let cache_manager = self.cache_manager.as_ref()?;
let write_cache = cache_manager.write_cache()?;

View File

@@ -45,6 +45,7 @@ use crate::region_write_ctx::RegionWriteCtx;
use crate::request::OptionOutputTx;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file_purger::LocalFilePurger;
use crate::sst::index::intermediate::IntermediateManager;
use crate::wal::{EntryId, Wal};
/// Builder to create a new [MitoRegion] or open an existing one.
@@ -58,6 +59,7 @@ pub(crate) struct RegionOpener {
options: Option<RegionOptions>,
cache_manager: Option<CacheManagerRef>,
skip_wal_replay: bool,
intermediate_manager: IntermediateManager,
}
impl RegionOpener {
@@ -68,6 +70,7 @@ impl RegionOpener {
memtable_builder: MemtableBuilderRef,
object_store_manager: ObjectStoreManagerRef,
scheduler: SchedulerRef,
intermediate_manager: IntermediateManager,
) -> RegionOpener {
RegionOpener {
region_id,
@@ -79,6 +82,7 @@ impl RegionOpener {
options: None,
cache_manager: None,
skip_wal_replay: false,
intermediate_manager,
}
}
@@ -170,7 +174,11 @@ impl RegionOpener {
.options(options)
.build();
let version_control = Arc::new(VersionControl::new(version));
let access_layer = Arc::new(AccessLayer::new(self.region_dir, object_store));
let access_layer = Arc::new(AccessLayer::new(
self.region_dir,
object_store,
self.intermediate_manager,
));
Ok(MitoRegion {
region_id,
@@ -240,7 +248,11 @@ impl RegionOpener {
let region_id = self.region_id;
let object_store = self.object_store(&region_options.storage)?.clone();
let access_layer = Arc::new(AccessLayer::new(self.region_dir.clone(), object_store));
let access_layer = Arc::new(AccessLayer::new(
self.region_dir.clone(),
object_store,
self.intermediate_manager.clone(),
));
let file_purger = Arc::new(LocalFilePurger::new(
self.scheduler.clone(),
access_layer.clone(),

View File

@@ -97,6 +97,7 @@ impl FilePurger for LocalFilePurger {
mod tests {
use common_test_util::temp_dir::create_temp_dir;
use object_store::services::Fs;
use object_store::util::join_dir;
use object_store::ObjectStore;
use smallvec::SmallVec;
@@ -104,6 +105,7 @@ mod tests {
use crate::access_layer::AccessLayer;
use crate::schedule::scheduler::{LocalScheduler, Scheduler};
use crate::sst::file::{FileHandle, FileId, FileMeta, FileTimeRange, IndexType};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::location;
#[tokio::test]
@@ -111,17 +113,21 @@ mod tests {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("file-purge");
let dir_path = dir.path().display().to_string();
let mut builder = Fs::default();
builder.root(dir.path().to_str().unwrap());
let object_store = ObjectStore::new(builder).unwrap().finish();
builder.root(&dir_path);
let sst_file_id = FileId::random();
let sst_dir = "table1";
let path = location::sst_file_path(sst_dir, sst_file_id);
let intm_mgr = IntermediateManager::init_fs(join_dir(&dir_path, "intm"))
.await
.unwrap();
let object_store = ObjectStore::new(builder).unwrap().finish();
object_store.write(&path, vec![0; 4096]).await.unwrap();
let scheduler = Arc::new(LocalScheduler::new(3));
let layer = Arc::new(AccessLayer::new(sst_dir, object_store.clone()));
let layer = Arc::new(AccessLayer::new(sst_dir, object_store.clone(), intm_mgr));
let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
@@ -152,13 +158,17 @@ mod tests {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("file-purge");
let dir_path = dir.path().display().to_string();
let mut builder = Fs::default();
builder.root(dir.path().to_str().unwrap());
let object_store = ObjectStore::new(builder).unwrap().finish();
builder.root(&dir_path);
let sst_file_id = FileId::random();
let sst_dir = "table1";
let intm_mgr = IntermediateManager::init_fs(join_dir(&dir_path, "intm"))
.await
.unwrap();
let path = location::sst_file_path(sst_dir, sst_file_id);
let object_store = ObjectStore::new(builder).unwrap().finish();
object_store.write(&path, vec![0; 4096]).await.unwrap();
let index_path = location::index_file_path(sst_dir, sst_file_id);
@@ -168,7 +178,7 @@ mod tests {
.unwrap();
let scheduler = Arc::new(LocalScheduler::new(3));
let layer = Arc::new(AccessLayer::new(sst_dir, object_store.clone()));
let layer = Arc::new(AccessLayer::new(sst_dir, object_store.clone(), intm_mgr));
let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));

View File

@@ -12,13 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
pub mod applier;
pub(crate) mod applier;
mod codec;
pub mod creator;
pub(crate) mod creator;
pub(crate) mod intermediate;
mod store;
use std::num::NonZeroUsize;
use common_telemetry::{debug, warn};
use creator::SstIndexCreator;
use object_store::ObjectStore;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use crate::read::Batch;
use crate::sst::file::FileId;
use crate::sst::index::intermediate::IntermediateManager;
const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1";
// TODO(zhongzc): how to determine this value?
@@ -27,3 +38,267 @@ const MIN_MEMORY_USAGE_THRESHOLD: usize = 8192;
/// The buffer size for the pipe used to send index data to the puffin blob.
const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
/// The index creator that hides the error handling details.
#[derive(Default)]
pub struct Indexer {
file_id: FileId,
region_id: RegionId,
inner: Option<SstIndexCreator>,
}
impl Indexer {
/// Update the index with the given batch.
pub async fn update(&mut self, batch: &Batch) {
if let Some(creator) = self.inner.as_mut() {
if let Err(err) = creator.update(batch).await {
warn!(
err; "Failed to update index, skip creating index, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
// Skip index creation if error occurs.
self.inner = None;
}
}
}
/// Finish the index creation.
/// Returns the number of bytes written if success or None if failed.
pub async fn finish(&mut self) -> Option<usize> {
if let Some(mut creator) = self.inner.take() {
match creator.finish().await {
Ok((row_count, byte_count)) => {
debug!(
"Create index successfully, region_id: {}, file_id: {}, bytes: {}, rows: {}",
self.region_id, self.file_id, byte_count, row_count
);
return Some(byte_count);
}
Err(err) => {
warn!(
err; "Failed to create index, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
}
}
}
None
}
/// Abort the index creation.
pub async fn abort(&mut self) {
if let Some(mut creator) = self.inner.take() {
if let Err(err) = creator.abort().await {
warn!(
err; "Failed to abort index, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
}
}
}
}
pub(crate) struct IndexerBuilder<'a> {
pub(crate) create_inverted_index: bool,
pub(crate) mem_threshold_index_create: Option<usize>,
pub(crate) file_id: FileId,
pub(crate) file_path: String,
pub(crate) metadata: &'a RegionMetadataRef,
pub(crate) row_group_size: usize,
pub(crate) object_store: ObjectStore,
pub(crate) intermediate_manager: IntermediateManager,
}
impl<'a> IndexerBuilder<'a> {
/// Sanity check for arguments and create a new [Indexer]
/// with inner [SstIndexCreator] if arguments are valid.
pub(crate) fn build(self) -> Indexer {
if !self.create_inverted_index {
debug!(
"Skip creating index due to request, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
);
return Indexer::default();
}
if self.metadata.primary_key.is_empty() {
debug!(
"No tag columns, skip creating index, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
);
return Indexer::default();
}
let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else {
warn!(
"Row group size is 0, skip creating index, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
);
return Indexer::default();
};
let creator = SstIndexCreator::new(
self.file_path,
self.file_id,
self.metadata,
self.object_store,
self.intermediate_manager,
self.mem_threshold_index_create,
row_group_size,
);
Indexer {
file_id: self.file_id,
region_id: self.metadata.region_id,
inner: Some(creator),
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::SemanticType;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use object_store::services::Memory;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use super::*;
fn mock_region_metadata() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
semantic_type: SemanticType::Field,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"c",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 3,
})
.primary_key(vec![1]);
Arc::new(builder.build().unwrap())
}
fn no_tag_region_metadata() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
semantic_type: SemanticType::Field,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
semantic_type: SemanticType::Field,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"c",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 3,
});
Arc::new(builder.build().unwrap())
}
fn mock_object_store() -> ObjectStore {
ObjectStore::new(Memory::default()).unwrap().finish()
}
fn mock_intm_mgr() -> IntermediateManager {
IntermediateManager::new(mock_object_store())
}
#[test]
fn test_build_indexer_basic() {
let metadata = mock_region_metadata();
let indexer = IndexerBuilder {
create_inverted_index: true,
mem_threshold_index_create: Some(1024),
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
row_group_size: 1024,
object_store: mock_object_store(),
intermediate_manager: mock_intm_mgr(),
}
.build();
assert!(indexer.inner.is_some());
}
#[test]
fn test_build_indexer_disable_create() {
let metadata = mock_region_metadata();
let indexer = IndexerBuilder {
create_inverted_index: false,
mem_threshold_index_create: Some(1024),
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
row_group_size: 1024,
object_store: mock_object_store(),
intermediate_manager: mock_intm_mgr(),
}
.build();
assert!(indexer.inner.is_none());
}
#[test]
fn test_build_indexer_no_tag() {
let metadata = no_tag_region_metadata();
let indexer = IndexerBuilder {
create_inverted_index: true,
mem_threshold_index_create: Some(1024),
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
row_group_size: 1024,
object_store: mock_object_store(),
intermediate_manager: mock_intm_mgr(),
}
.build();
assert!(indexer.inner.is_none());
}
#[test]
fn test_build_indexer_zero_row_group() {
let metadata = mock_region_metadata();
let indexer = IndexerBuilder {
create_inverted_index: true,
mem_threshold_index_create: Some(1024),
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
row_group_size: 0,
object_store: mock_object_store(),
intermediate_manager: mock_intm_mgr(),
}
.build();
assert!(indexer.inner.is_none());
}
}

View File

@@ -43,22 +43,19 @@ use crate::sst::file::FileId;
use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec};
use crate::sst::index::creator::statistics::Statistics;
use crate::sst::index::creator::temp_provider::TempFileProvider;
use crate::sst::index::intermediate::{IntermediateLocation, IntermediateManager};
use crate::sst::index::store::InstrumentedStore;
use crate::sst::index::{
INDEX_BLOB_TYPE, MIN_MEMORY_USAGE_THRESHOLD, PIPE_BUFFER_SIZE_FOR_SENDING_BLOB,
};
use crate::sst::location::{self, IntermediateLocation};
type ByteCount = usize;
type RowCount = usize;
/// Creates SST index.
pub struct SstIndexCreator {
/// Directory of the region.
region_dir: String,
/// ID of the SST file.
sst_file_id: FileId,
/// Path of index file to write.
file_path: String,
/// The store to write index files.
store: InstrumentedStore,
/// The index creator.
@@ -81,11 +78,11 @@ impl SstIndexCreator {
/// Creates a new `SstIndexCreator`.
/// Should ensure that the number of tag columns is greater than 0.
pub fn new(
region_dir: String,
file_path: String,
sst_file_id: FileId,
metadata: &RegionMetadataRef,
index_store: ObjectStore,
intermediate_store: ObjectStore, // prefer to use local store
intermediate_manager: IntermediateManager,
memory_usage_threshold: Option<usize>,
row_group_size: NonZeroUsize,
) -> Self {
@@ -95,16 +92,15 @@ impl SstIndexCreator {
(threshold / metadata.primary_key.len()).max(MIN_MEMORY_USAGE_THRESHOLD)
});
let temp_file_provider = Arc::new(TempFileProvider::new(
IntermediateLocation::new(&region_dir, &sst_file_id),
InstrumentedStore::new(intermediate_store),
IntermediateLocation::new(&metadata.region_id, &sst_file_id),
intermediate_manager,
));
let sorter = ExternalSorter::factory(temp_file_provider.clone() as _, memory_threshold);
let index_creator = Box::new(SortIndexCreator::new(sorter, row_group_size));
let codec = IndexValuesCodec::from_tag_columns(metadata.primary_key_columns());
Self {
region_dir,
sst_file_id,
file_path,
store: InstrumentedStore::new(index_store),
codec,
index_creator,
@@ -129,10 +125,7 @@ impl SstIndexCreator {
if let Err(update_err) = self.do_update(batch).await {
// clean up garbage if failed to update
if let Err(err) = self.do_cleanup().await {
warn!(
err; "Failed to clean up index creator, region_dir: {}, sst_file_id: {}",
self.region_dir, self.sst_file_id,
);
warn!(err; "Failed to clean up index creator, file_path: {}", self.file_path);
}
return Err(update_err);
}
@@ -153,10 +146,7 @@ impl SstIndexCreator {
let finish_res = self.do_finish().await;
// clean up garbage no matter finish successfully or not
if let Err(err) = self.do_cleanup().await {
warn!(
err; "Failed to clean up index creator, region_dir: {}, sst_file_id: {}",
self.region_dir, self.sst_file_id,
);
warn!(err; "Failed to clean up index creator, file_path: {}", self.file_path);
}
finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
@@ -216,11 +206,10 @@ impl SstIndexCreator {
async fn do_finish(&mut self) -> Result<()> {
let mut guard = self.stats.record_finish();
let file_path = location::index_file_path(&self.region_dir, self.sst_file_id);
let file_writer = self
.store
.writer(
&file_path,
&self.file_path,
&INDEX_PUFFIN_WRITE_BYTES_TOTAL,
&INDEX_PUFFIN_WRITE_OP_TOTAL,
&INDEX_PUFFIN_FLUSH_OP_TOTAL,

View File

@@ -27,16 +27,15 @@ use crate::metrics::{
INDEX_INTERMEDIATE_READ_OP_TOTAL, INDEX_INTERMEDIATE_SEEK_OP_TOTAL,
INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL, INDEX_INTERMEDIATE_WRITE_OP_TOTAL,
};
use crate::sst::index::store::InstrumentedStore;
use crate::sst::location::IntermediateLocation;
use crate::sst::index::intermediate::{IntermediateLocation, IntermediateManager};
/// `TempFileProvider` implements `ExternalTempFileProvider`.
/// It uses `InstrumentedStore` to create and read intermediate files.
pub(crate) struct TempFileProvider {
/// Provides the location of intermediate files.
location: IntermediateLocation,
/// Provides access to files in the object store.
store: InstrumentedStore,
/// Provides store to access to intermediate files.
manager: IntermediateManager,
}
#[async_trait]
@@ -48,7 +47,8 @@ impl ExternalTempFileProvider for TempFileProvider {
) -> IndexResult<Box<dyn AsyncWrite + Unpin + Send>> {
let path = self.location.file_path(column_id, file_id);
let writer = self
.store
.manager
.store()
.writer(
&path,
&INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL,
@@ -67,7 +67,8 @@ impl ExternalTempFileProvider for TempFileProvider {
) -> IndexResult<Vec<Box<dyn AsyncRead + Unpin + Send>>> {
let column_path = self.location.column_path(column_id);
let entries = self
.store
.manager
.store()
.list(&column_path)
.await
.map_err(BoxedError::new)
@@ -81,7 +82,8 @@ impl ExternalTempFileProvider for TempFileProvider {
}
let reader = self
.store
.manager
.store()
.reader(
entry.path(),
&INDEX_INTERMEDIATE_READ_BYTES_TOTAL,
@@ -100,30 +102,35 @@ impl ExternalTempFileProvider for TempFileProvider {
impl TempFileProvider {
/// Creates a new `TempFileProvider`.
pub fn new(location: IntermediateLocation, store: InstrumentedStore) -> Self {
Self { location, store }
pub fn new(location: IntermediateLocation, manager: IntermediateManager) -> Self {
Self { location, manager }
}
/// Removes all intermediate files.
pub async fn cleanup(&self) -> Result<()> {
self.store.remove_all(self.location.root_path()).await
self.manager
.store()
.remove_all(self.location.root_path())
.await
}
}
#[cfg(test)]
mod tests {
use common_test_util::temp_dir;
use futures::{AsyncReadExt, AsyncWriteExt};
use object_store::services::Memory;
use object_store::ObjectStore;
use store_api::storage::RegionId;
use super::*;
use crate::sst::file::FileId;
#[tokio::test]
async fn test_temp_file_provider_basic() {
let location = IntermediateLocation::new("region_dir", &FileId::random());
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
let store = InstrumentedStore::new(object_store);
let temp_dir = temp_dir::create_temp_dir("intermediate");
let path = temp_dir.path().display().to_string();
let location = IntermediateLocation::new(&RegionId::new(0, 0), &FileId::random());
let store = IntermediateManager::init_fs(path).await.unwrap();
let provider = TempFileProvider::new(location.clone(), store);
let column_name = "tag0";
@@ -163,7 +170,8 @@ mod tests {
provider.cleanup().await.unwrap();
assert!(provider
.store
.manager
.store()
.list(location.root_path())
.await
.unwrap()

View File

@@ -0,0 +1,153 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::warn;
use object_store::util::{self, normalize_dir};
use store_api::storage::RegionId;
use uuid::Uuid;
use crate::access_layer::new_fs_object_store;
use crate::error::Result;
use crate::sst::file::FileId;
use crate::sst::index::store::InstrumentedStore;
const INTERMEDIATE_DIR: &str = "__intm";
/// `IntermediateManager` provides store to access to intermediate files.
#[derive(Clone)]
pub struct IntermediateManager {
store: InstrumentedStore,
}
impl IntermediateManager {
/// Create a new `IntermediateManager` with the given root path.
/// It will clean up all garbage intermediate files from previous runs.
pub async fn init_fs(root_path: impl AsRef<str>) -> Result<Self> {
let store = new_fs_object_store(&normalize_dir(root_path.as_ref())).await?;
let store = InstrumentedStore::new(store);
// Remove all garbage intermediate files from previous runs.
if let Err(err) = store.remove_all(INTERMEDIATE_DIR).await {
warn!(err; "Failed to remove garbage intermediate files");
}
Ok(Self { store })
}
/// Returns the store to access to intermediate files.
pub(crate) fn store(&self) -> &InstrumentedStore {
&self.store
}
#[cfg(test)]
pub(crate) fn new(store: object_store::ObjectStore) -> Self {
Self {
store: InstrumentedStore::new(store),
}
}
}
/// `IntermediateLocation` produces paths for intermediate files
/// during external sorting.
#[derive(Debug, Clone)]
pub struct IntermediateLocation {
root_path: String,
}
impl IntermediateLocation {
/// Create a new `IntermediateLocation`. Set the root directory to
/// `__intm/{region_id}/{sst_file_id}/{uuid}/`, incorporating
/// uuid to differentiate active sorting files from orphaned data due to unexpected
/// process termination.
pub fn new(region_id: &RegionId, sst_file_id: &FileId) -> Self {
let region_id = region_id.as_u64();
let uuid = Uuid::new_v4();
Self {
root_path: format!("{INTERMEDIATE_DIR}/{region_id}/{sst_file_id}/{uuid}/"),
}
}
/// Returns the root directory of the intermediate files
pub fn root_path(&self) -> &str {
&self.root_path
}
/// Returns the path of the directory for intermediate files associated with a column:
/// `__intm/{region_id}/{sst_file_id}/{uuid}/{column_id}/`
pub fn column_path(&self, column_id: &str) -> String {
util::join_path(&self.root_path, &format!("{column_id}/"))
}
/// Returns the path of the intermediate file with the given id for a column:
/// `__intm/{region_id}/{sst_file_id}/{uuid}/{column_id}/{im_file_id}.im`
pub fn file_path(&self, column_id: &str, im_file_id: &str) -> String {
util::join_path(&self.column_path(column_id), &format!("{im_file_id}.im"))
}
}
#[cfg(test)]
mod tests {
use common_test_util::temp_dir;
use regex::Regex;
use super::*;
#[tokio::test]
async fn test_manager() {
let temp_dir = temp_dir::create_temp_dir("index_intermediate");
let path = temp_dir.path().to_str().unwrap();
// write a garbage file
tokio::fs::create_dir_all(format!("{path}/{INTERMEDIATE_DIR}"))
.await
.unwrap();
tokio::fs::write(format!("{path}/{INTERMEDIATE_DIR}/garbage.im"), "blahblah")
.await
.unwrap();
let _manager = IntermediateManager::init_fs(path).await.unwrap();
// cleaned up by `init_fs`
assert!(!tokio::fs::try_exists(format!("{path}/{INTERMEDIATE_DIR}"))
.await
.unwrap());
}
#[test]
fn test_intermediate_location() {
let sst_file_id = FileId::random();
let location = IntermediateLocation::new(&RegionId::new(0, 0), &sst_file_id);
let re = Regex::new(&format!(
"{INTERMEDIATE_DIR}/0/{sst_file_id}/{}/",
r"\w{8}-\w{4}-\w{4}-\w{4}-\w{12}"
))
.unwrap();
assert!(re.is_match(location.root_path()));
let uuid = location.root_path().split('/').nth(3).unwrap();
let column_id = "1";
assert_eq!(
location.column_path(column_id),
format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{column_id}/")
);
let im_file_id = "000000000010";
assert_eq!(
location.file_path(column_id, im_file_id),
format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{column_id}/{im_file_id}.im")
);
}
}

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use object_store::util;
use uuid::Uuid;
use crate::sst::file::FileId;
@@ -30,48 +29,8 @@ pub fn index_file_path(region_dir: &str, sst_file_id: FileId) -> String {
util::join_path(&dir, &sst_file_id.as_puffin())
}
/// `IntermediateLocation` produces paths for intermediate files
/// during external sorting.
#[derive(Debug, Clone)]
pub struct IntermediateLocation {
root_path: String,
}
impl IntermediateLocation {
/// Create a new `IntermediateLocation`. Set the root directory to
/// `{region_dir}/index/__intermediate/{sst_file_id}/{uuid}/`, incorporating
/// uuid to differentiate active sorting files from orphaned data due to unexpected
/// process termination.
pub fn new(region_dir: &str, sst_file_id: &FileId) -> Self {
let uuid = Uuid::new_v4();
let child = format!("index/__intermediate/{sst_file_id}/{uuid}/");
Self {
root_path: util::join_path(region_dir, &child),
}
}
/// Returns the root directory of the intermediate files
pub fn root_path(&self) -> &str {
&self.root_path
}
/// Returns the path of the directory for intermediate files associated with a column:
/// `{region_dir}/index/__intermediate/{sst_file_id}/{uuid}/{column_id}/`
pub fn column_path(&self, column_id: &str) -> String {
util::join_path(&self.root_path, &format!("{column_id}/"))
}
/// Returns the path of the intermediate file with the given id for a column:
/// `{region_dir}/index/__intermediate/{sst_file_id}/{uuid}/{column_id}/{im_file_id}.im`
pub fn file_path(&self, column_id: &str, im_file_id: &str) -> String {
util::join_path(&self.column_path(column_id), &format!("{im_file_id}.im"))
}
}
#[cfg(test)]
mod tests {
use regex::Regex;
use super::*;
#[test]
@@ -91,33 +50,4 @@ mod tests {
format!("region_dir/index/{file_id}.puffin")
);
}
#[test]
fn test_intermediate_location() {
let sst_file_id = FileId::random();
let location = IntermediateLocation::new("region_dir", &sst_file_id);
let re = Regex::new(&format!(
"region_dir/index/__intermediate/{sst_file_id}/{}/",
r"\w{8}-\w{4}-\w{4}-\w{4}-\w{12}"
))
.unwrap();
assert!(re.is_match(location.root_path()));
let uuid = location.root_path().split('/').nth(4).unwrap();
let column_id = "1";
assert_eq!(
location.column_path(column_id),
format!("region_dir/index/__intermediate/{sst_file_id}/{uuid}/{column_id}/")
);
let im_file_id = "000000000010";
assert_eq!(
location.file_path(column_id, im_file_id),
format!(
"region_dir/index/__intermediate/{sst_file_id}/{uuid}/{column_id}/{im_file_id}.im"
)
);
}
}

View File

@@ -80,6 +80,7 @@ mod tests {
use super::*;
use crate::cache::{CacheManager, PageKey};
use crate::sst::index::Indexer;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
use crate::test_util::sst_util::{
@@ -107,7 +108,12 @@ mod tests {
..Default::default()
};
let mut writer = ParquetWriter::new(file_path, metadata, object_store.clone());
let mut writer = ParquetWriter::new(
file_path,
metadata,
object_store.clone(),
Indexer::default(),
);
let info = writer
.write_all(source, &write_opts)
.await
@@ -156,7 +162,12 @@ mod tests {
..Default::default()
};
// Prepare data.
let mut writer = ParquetWriter::new(file_path, metadata.clone(), object_store.clone());
let mut writer = ParquetWriter::new(
file_path,
metadata.clone(),
object_store.clone(),
Indexer::default(),
);
writer
.write_all(source, &write_opts)
.await
@@ -225,7 +236,12 @@ mod tests {
// write the sst file and get sst info
// sst info contains the parquet metadata, which is converted from FileMetaData
let mut writer = ParquetWriter::new(file_path, metadata.clone(), object_store.clone());
let mut writer = ParquetWriter::new(
file_path,
metadata.clone(),
object_store.clone(),
Indexer::default(),
);
let sst_info = writer
.write_all(source, &write_opts)
.await

View File

@@ -19,6 +19,7 @@ use std::sync::Arc;
use common_datasource::file_format::parquet::BufferedWriter;
use common_telemetry::debug;
use common_time::Timestamp;
use futures::TryFutureExt;
use object_store::ObjectStore;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
@@ -28,10 +29,11 @@ use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::consts::SEQUENCE_COLUMN_NAME;
use super::helper::parse_parquet_metadata;
use crate::error::{InvalidMetadataSnafu, Result, WriteBufferSnafu};
use crate::read::{Batch, Source};
use crate::sst::index::Indexer;
use crate::sst::parquet::format::WriteFormat;
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
/// Parquet SST writer.
@@ -41,6 +43,7 @@ pub struct ParquetWriter {
/// Region metadata of the source and the target SST.
metadata: RegionMetadataRef,
object_store: ObjectStore,
indexer: Indexer,
}
impl ParquetWriter {
@@ -49,11 +52,13 @@ impl ParquetWriter {
file_path: String,
metadata: RegionMetadataRef,
object_store: ObjectStore,
indexer: Indexer,
) -> ParquetWriter {
ParquetWriter {
file_path,
metadata,
object_store,
indexer,
}
}
@@ -90,16 +95,22 @@ impl ParquetWriter {
.context(WriteBufferSnafu)?;
let mut stats = SourceStats::default();
while let Some(batch) = source.next_batch().await? {
while let Some(batch) = write_next_batch(&mut source, &write_format, &mut buffered_writer)
.or_else(|err| async {
// abort index creation if error occurs.
self.indexer.abort().await;
Err(err)
})
.await?
{
stats.update(&batch);
let arrow_batch = write_format.convert_batch(&batch)?;
buffered_writer
.write(&arrow_batch)
.await
.context(WriteBufferSnafu)?;
self.indexer.update(&batch).await;
}
let index_size = self.indexer.finish().await;
let inverted_index_available = index_size.is_some();
let index_file_size = index_size.unwrap_or(0) as u64;
if stats.num_rows == 0 {
debug!(
"No data written, try to stop the writer: {}",
@@ -124,8 +135,8 @@ impl ParquetWriter {
file_size,
num_rows: stats.num_rows,
file_metadata: Some(Arc::new(parquet_metadata)),
inverted_index_available: false,
index_file_size: 0,
inverted_index_available,
index_file_size,
}))
}
@@ -149,6 +160,24 @@ impl ParquetWriter {
}
}
async fn write_next_batch(
source: &mut Source,
write_format: &WriteFormat,
buffered_writer: &mut BufferedWriter,
) -> Result<Option<Batch>> {
let Some(batch) = source.next_batch().await? else {
return Ok(None);
};
let arrow_batch = write_format.convert_batch(&batch)?;
buffered_writer
.write(&arrow_batch)
.await
.context(WriteBufferSnafu)?;
Ok(Some(batch))
}
#[derive(Default)]
struct SourceStats {
/// Number of rows fetched.

View File

@@ -136,7 +136,8 @@ impl TestEnv {
let object_store_manager = Arc::new(object_store_manager);
self.logstore = Some(logstore.clone());
self.object_store_manager = Some(object_store_manager.clone());
MitoEngine::new(config, logstore, object_store_manager)
let data_home = self.data_home().display().to_string();
MitoEngine::new(&data_home, config, logstore, object_store_manager)
.await
.unwrap()
}
@@ -145,8 +146,8 @@ impl TestEnv {
pub async fn create_follower_engine(&mut self, config: MitoConfig) -> MitoEngine {
let logstore = self.logstore.as_ref().unwrap().clone();
let object_store_manager = self.object_store_manager.as_ref().unwrap().clone();
MitoEngine::new(config, logstore, object_store_manager)
let data_home = self.data_home().display().to_string();
MitoEngine::new(&data_home, config, logstore, object_store_manager)
.await
.unwrap()
}
@@ -164,9 +165,19 @@ impl TestEnv {
let object_store_manager = Arc::new(object_store_manager);
self.logstore = Some(logstore.clone());
self.object_store_manager = Some(object_store_manager.clone());
MitoEngine::new_for_test(config, logstore, object_store_manager, manager, listener)
.await
.unwrap()
let data_home = self.data_home().display().to_string();
MitoEngine::new_for_test(
&data_home,
config,
logstore,
object_store_manager,
manager,
listener,
)
.await
.unwrap()
}
pub async fn create_engine_with_multiple_object_stores(
@@ -195,9 +206,18 @@ impl TestEnv {
let object_store_manager = Arc::new(object_store_manager);
self.logstore = Some(logstore.clone());
self.object_store_manager = Some(object_store_manager.clone());
MitoEngine::new_for_test(config, logstore, object_store_manager, manager, listener)
.await
.unwrap()
let data_home = self.data_home().display().to_string();
MitoEngine::new_for_test(
&data_home,
config,
logstore,
object_store_manager,
manager,
listener,
)
.await
.unwrap()
}
/// Reopen the engine.
@@ -205,6 +225,7 @@ impl TestEnv {
engine.stop().await.unwrap();
MitoEngine::new(
&self.data_home().display().to_string(),
config,
self.logstore.clone().unwrap(),
self.object_store_manager.clone().unwrap(),
@@ -216,6 +237,7 @@ impl TestEnv {
/// Open the engine.
pub async fn open_engine(&mut self, config: MitoConfig) -> MitoEngine {
MitoEngine::new(
&self.data_home().display().to_string(),
config,
self.logstore.clone().unwrap(),
self.object_store_manager.clone().unwrap(),
@@ -231,9 +253,11 @@ impl TestEnv {
}
/// Creates a new [WorkerGroup] with specific config under this env.
pub(crate) async fn create_worker_group(&self, config: MitoConfig) -> WorkerGroup {
pub(crate) async fn create_worker_group(&self, mut config: MitoConfig) -> WorkerGroup {
let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await;
let data_home = self.data_home().display().to_string();
config.sanitize(&data_home).unwrap();
WorkerGroup::start(
Arc::new(config),
Arc::new(log_store),

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use object_store::services::Fs;
use object_store::util::join_dir;
use object_store::ObjectStore;
use tokio::sync::mpsc::Sender;
@@ -27,6 +28,7 @@ use crate::compaction::CompactionScheduler;
use crate::flush::FlushScheduler;
use crate::request::WorkerRequest;
use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
use crate::sst::index::intermediate::IntermediateManager;
/// Scheduler mocker.
pub(crate) struct SchedulerEnv {
@@ -39,15 +41,20 @@ pub(crate) struct SchedulerEnv {
impl SchedulerEnv {
/// Creates a new mocker.
pub(crate) fn new() -> SchedulerEnv {
pub(crate) async fn new() -> SchedulerEnv {
let path = create_temp_dir("");
let path_str = path.path().display().to_string();
let mut builder = Fs::default();
builder.root(path.path().to_str().unwrap());
builder.root(&path_str);
let intm_mgr = IntermediateManager::init_fs(join_dir(&path_str, "intm"))
.await
.unwrap();
let object_store = ObjectStore::new(builder).unwrap().finish();
let access_layer = Arc::new(AccessLayer::new("", object_store.clone()));
let access_layer = Arc::new(AccessLayer::new("", object_store.clone(), intm_mgr));
SchedulerEnv {
path: create_temp_dir(""),
path,
access_layer,
scheduler: None,
}

View File

@@ -55,6 +55,7 @@ use crate::request::{
BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest,
};
use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
use crate::sst::index::intermediate::IntermediateManager;
use crate::wal::Wal;
/// Identifier for a worker.
@@ -120,8 +121,15 @@ impl WorkerGroup {
let write_buffer_manager = Arc::new(WriteBufferManagerImpl::new(
config.global_write_buffer_size.as_bytes() as usize,
));
let intermediate_manager =
IntermediateManager::init_fs(&config.inverted_index.intermediate_path).await?;
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let write_cache = write_cache_from_config(&config, object_store_manager.clone()).await?;
let write_cache = write_cache_from_config(
&config,
object_store_manager.clone(),
intermediate_manager.clone(),
)
.await?;
let cache_manager = Arc::new(
CacheManager::builder()
.sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
@@ -142,6 +150,7 @@ impl WorkerGroup {
scheduler: scheduler.clone(),
listener: WorkerListener::default(),
cache_manager: cache_manager.clone(),
intermediate_manager: intermediate_manager.clone(),
}
.start()
})
@@ -222,7 +231,14 @@ impl WorkerGroup {
))
});
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let write_cache = write_cache_from_config(&config, object_store_manager.clone()).await?;
let intermediate_manager =
IntermediateManager::init_fs(&config.inverted_index.intermediate_path).await?;
let write_cache = write_cache_from_config(
&config,
object_store_manager.clone(),
intermediate_manager.clone(),
)
.await?;
let cache_manager = Arc::new(
CacheManager::builder()
.sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
@@ -231,7 +247,6 @@ impl WorkerGroup {
.write_cache(write_cache)
.build(),
);
let workers = (0..config.num_workers)
.map(|id| {
WorkerStarter {
@@ -243,6 +258,7 @@ impl WorkerGroup {
scheduler: scheduler.clone(),
listener: WorkerListener::new(listener.clone()),
cache_manager: cache_manager.clone(),
intermediate_manager: intermediate_manager.clone(),
}
.start()
})
@@ -263,6 +279,7 @@ fn value_to_index(value: usize, num_workers: usize) -> usize {
async fn write_cache_from_config(
config: &MitoConfig,
object_store_manager: ObjectStoreManagerRef,
intermediate_manager: IntermediateManager,
) -> Result<Option<WriteCacheRef>> {
if !config.enable_experimental_write_cache {
return Ok(None);
@@ -275,6 +292,7 @@ async fn write_cache_from_config(
&config.experimental_write_cache_path,
object_store_manager,
config.experimental_write_cache_size,
intermediate_manager,
)
.await?;
Ok(Some(Arc::new(cache)))
@@ -290,6 +308,7 @@ struct WorkerStarter<S> {
scheduler: SchedulerRef,
listener: WorkerListener,
cache_manager: CacheManagerRef,
intermediate_manager: IntermediateManager,
}
impl<S: LogStore> WorkerStarter<S> {
@@ -323,6 +342,7 @@ impl<S: LogStore> WorkerStarter<S> {
stalled_requests: StalledRequests::default(),
listener: self.listener,
cache_manager: self.cache_manager,
intermediate_manager: self.intermediate_manager,
};
let handle = common_runtime::spawn_write(async move {
worker_thread.run().await;
@@ -479,6 +499,8 @@ struct RegionWorkerLoop<S> {
listener: WorkerListener,
/// Cache.
cache_manager: CacheManagerRef,
/// Intermediate manager for inverted index.
intermediate_manager: IntermediateManager,
}
impl<S: LogStore> RegionWorkerLoop<S> {

View File

@@ -54,6 +54,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.memtable_builder.clone(),
self.object_store_manager.clone(),
self.scheduler.clone(),
self.intermediate_manager.clone(),
)
.cache(Some(self.cache_manager.clone()))
.options(region.version().options.clone())

View File

@@ -61,6 +61,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.memtable_builder.clone(),
self.object_store_manager.clone(),
self.scheduler.clone(),
self.intermediate_manager.clone(),
)
.metadata(metadata)
.parse_options(request.options)?

View File

@@ -68,6 +68,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.memtable_builder.clone(),
self.object_store_manager.clone(),
self.scheduler.clone(),
self.intermediate_manager.clone(),
)
.skip_wal_replay(request.skip_wal_replay)
.parse_options(request.options)?

View File

@@ -777,6 +777,13 @@ sst_write_buffer_size = "8MiB"
parallel_scan_channel_size = 32
allow_stale_entries = false
[datanode.region_engine.mito.inverted_index]
create_on_flush = "auto"
create_on_compaction = "auto"
apply_on_query = "auto"
mem_threshold_on_create = "64.0MiB"
intermediate_path = ""
[[datanode.region_engine]]
[datanode.region_engine.file]

View File

@@ -19,19 +19,14 @@ SELECT * from t1;
| 1970-01-01T00:00:00 | 0.0 | host1 |
+-------------------------+-----+-------+
-- TODO(ruihang): fix this. t2 should not contains data from t1
CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy");
Affected Rows: 0
SELECT * from t2;
+-------------------------+-----+-----+
| ts | job | val |
+-------------------------+-----+-----+
| 1970-01-01T00:00:00.001 | | 1.0 |
| 1970-01-01T00:00:00 | | 0.0 |
+-------------------------+-----+-----+
++
++
INSERT INTO t2 VALUES (0, 'job1', 0), (1, 'job2', 1);
@@ -42,8 +37,6 @@ SELECT * from t2;
+-------------------------+------+-----+
| ts | job | val |
+-------------------------+------+-----+
| 1970-01-01T00:00:00.001 | | 1.0 |
| 1970-01-01T00:00:00 | | 0.0 |
| 1970-01-01T00:00:00.001 | job2 | 1.0 |
| 1970-01-01T00:00:00 | job1 | 0.0 |
+-------------------------+------+-----+

View File

@@ -6,7 +6,6 @@ INSERT INTO t1 VALUES (0, 0, 'host1'), (1, 1, 'host2');
SELECT * from t1;
-- TODO(ruihang): fix this. t2 should not contains data from t1
CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy");
SELECT * from t2;

View File

@@ -322,7 +322,7 @@ impl Env {
}
}
/// Setup kafka wal cluster if needed. The conterpart is in [GreptimeDB::stop].
/// Setup kafka wal cluster if needed. The counterpart is in [GreptimeDB::stop].
fn setup_wal(&self) {
if matches!(self.wal, WalConfig::Kafka { needs_kafka_cluster, .. } if needs_kafka_cluster) {
util::setup_wal();