From f712c1b3569904e4ca75bb0fb9352c1cb376e6e1 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 27 Jun 2025 20:11:28 +0800 Subject: [PATCH] feat: cherry-pick #6384 #6388 #6396 #6403 #6412 #6405 to 0.15 branch (#6414) * feat: supports CsvWithNames and CsvWithNamesAndTypes formats (#6384) * feat: supports CsvWithNames and CsvWithNamesAndTypes formats and object/array types Signed-off-by: Dennis Zhuang * test: added and fixed tests Signed-off-by: Dennis Zhuang * chore: fix test Signed-off-by: Dennis Zhuang * chore: remove comments Signed-off-by: Dennis Zhuang * test: add json type csv tests Signed-off-by: Dennis Zhuang * chore: remove comment Co-authored-by: Yingwen --------- Signed-off-by: Dennis Zhuang Co-authored-by: Yingwen Signed-off-by: evenyag * feat: introduce /v1/health for healthcheck from external (#6388) Signed-off-by: Ning Sun Signed-off-by: evenyag * feat: update dashboard to v0.10.1 (#6396) Co-authored-by: ZonaHex Signed-off-by: evenyag * fix: complete partial index search results in cache (#6403) * fix: complete partial index search results in cache Signed-off-by: Zhenchi * polish Signed-off-by: Zhenchi * address comments Signed-off-by: Zhenchi * add initial tests Signed-off-by: Zhenchi * cover issue case Signed-off-by: Zhenchi * TestEnv new -> async Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi Signed-off-by: evenyag * fix: skip failing nodes when gathering porcess info (#6412) * fix/process-manager-skip-fail-nodes: - **Enhance Error Handling in `process_manager.rs`:** Improved error handling by adding a warning log for failing nodes in the `list_process` method. This ensures that the process listing continues even if some nodes fail to respond. - **Add Error Type Import in `process_manager.rs`:** Included the `Error` type from the `error` module to handle errors more effectively within the `ProcessManager` implementation. Signed-off-by: Lei, HUANG * fix: clippy Signed-off-by: Lei, HUANG * fix/process-manager-skip-fail-nodes: **Enhancements to Debugging and Trait Implementation** - **`process_manager.rs`**: Improved logging by adding more detailed error messages when skipping failing nodes. - **`selector.rs`**: Enhanced the `FrontendClient` trait by adding the `Debug` trait bound to improve debugging capabilities. Signed-off-by: Lei, HUANG --------- Signed-off-by: Lei, HUANG Signed-off-by: evenyag * refactor: pass pipeline name through http header and get db from query context (#6405) Signed-off-by: zyy17 Signed-off-by: evenyag --------- Signed-off-by: Dennis Zhuang Signed-off-by: evenyag Signed-off-by: Ning Sun Signed-off-by: Zhenchi Signed-off-by: Lei, HUANG Signed-off-by: zyy17 Co-authored-by: dennis zhuang Co-authored-by: Ning Sun Co-authored-by: ZonaHe Co-authored-by: ZonaHex Co-authored-by: Zhenchi Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> Co-authored-by: zyy17 --- src/catalog/src/process_manager.rs | 18 +- src/common/frontend/src/selector.rs | 3 +- src/datanode/src/alive_keeper.rs | 2 +- src/datanode/src/heartbeat/handler.rs | 8 +- src/metric-engine/src/test_util.rs | 2 +- src/mito2/src/cache/index/inverted_index.rs | 2 +- src/mito2/src/cache/index/result_cache.rs | 10 + src/mito2/src/cache/write_cache.rs | 6 +- src/mito2/src/engine/alter_test.rs | 16 +- src/mito2/src/engine/append_mode_test.rs | 4 +- src/mito2/src/engine/basic_test.rs | 28 +- src/mito2/src/engine/batch_open_test.rs | 10 +- src/mito2/src/engine/catchup_test.rs | 25 +- src/mito2/src/engine/close_test.rs | 2 +- src/mito2/src/engine/compaction_test.rs | 18 +- src/mito2/src/engine/create_test.rs | 18 +- src/mito2/src/engine/drop_test.rs | 4 +- src/mito2/src/engine/edit_region_test.rs | 6 +- src/mito2/src/engine/filter_deleted_test.rs | 2 +- src/mito2/src/engine/flush_test.rs | 16 +- src/mito2/src/engine/merge_mode_test.rs | 4 +- src/mito2/src/engine/open_test.rs | 20 +- src/mito2/src/engine/parallel_test.rs | 2 +- src/mito2/src/engine/projection_test.rs | 2 +- src/mito2/src/engine/prune_test.rs | 8 +- src/mito2/src/engine/row_selector_test.rs | 2 +- src/mito2/src/engine/scan_test.rs | 4 +- src/mito2/src/engine/set_role_state_test.rs | 6 +- src/mito2/src/engine/sync_test.rs | 4 +- src/mito2/src/engine/truncate_test.rs | 10 +- src/mito2/src/manifest/manager.rs | 8 +- src/mito2/src/manifest/tests/checkpoint.rs | 2 +- .../src/sst/index/bloom_filter/applier.rs | 7 +- .../src/sst/index/fulltext_index/applier.rs | 6 +- .../src/sst/index/fulltext_index/creator.rs | 1 + src/mito2/src/sst/index/indexer/finish.rs | 37 +- src/mito2/src/sst/parquet.rs | 326 +++++++++++++++++- src/mito2/src/sst/parquet/reader.rs | 180 ++++++---- src/mito2/src/sst/parquet/row_selection.rs | 92 ++++- src/mito2/src/test_util.rs | 93 +++-- src/mito2/src/test_util/sst_util.rs | 9 +- src/mito2/src/worker.rs | 2 +- src/servers/dashboard/VERSION | 2 +- src/servers/src/elasticsearch.rs | 16 +- src/servers/src/http.rs | 73 +++- src/servers/src/http/handler.rs | 8 +- src/servers/src/http/result/csv_result.rs | 172 ++++++++- src/servers/tests/http/http_handler_test.rs | 4 +- tests-integration/tests/http.rs | 25 +- 49 files changed, 1006 insertions(+), 319 deletions(-) diff --git a/src/catalog/src/process_manager.rs b/src/catalog/src/process_manager.rs index ff2db26f46..9ee6744323 100644 --- a/src/catalog/src/process_manager.rs +++ b/src/catalog/src/process_manager.rs @@ -21,7 +21,7 @@ use std::sync::{Arc, RwLock}; use api::v1::frontend::{KillProcessRequest, ListProcessRequest, ProcessInfo}; use common_base::cancellation::CancellationHandle; use common_frontend::selector::{FrontendSelector, MetaClientSelector}; -use common_telemetry::{debug, info}; +use common_telemetry::{debug, info, warn}; use common_time::util::current_time_millis; use meta_client::MetaClientRef; use snafu::{ensure, OptionExt, ResultExt}; @@ -141,14 +141,20 @@ impl ProcessManager { .await .context(error::InvokeFrontendSnafu)?; for mut f in frontends { - processes.extend( - f.list_process(ListProcessRequest { + let result = f + .list_process(ListProcessRequest { catalog: catalog.unwrap_or_default().to_string(), }) .await - .context(error::InvokeFrontendSnafu)? - .processes, - ); + .context(error::InvokeFrontendSnafu); + match result { + Ok(resp) => { + processes.extend(resp.processes); + } + Err(e) => { + warn!(e; "Skipping failing node: {:?}", f) + } + } } } processes.extend(self.local_processes(catalog)?); diff --git a/src/common/frontend/src/selector.rs b/src/common/frontend/src/selector.rs index 3536ec85d8..e70f622fa0 100644 --- a/src/common/frontend/src/selector.rs +++ b/src/common/frontend/src/selector.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::Debug; use std::time::Duration; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; @@ -30,7 +31,7 @@ use crate::error::{MetaSnafu, Result}; pub type FrontendClientPtr = Box; #[async_trait::async_trait] -pub trait FrontendClient: Send { +pub trait FrontendClient: Send + Debug { async fn list_process(&mut self, req: ListProcessRequest) -> Result; async fn kill_process(&mut self, req: KillProcessRequest) -> Result; diff --git a/src/datanode/src/alive_keeper.rs b/src/datanode/src/alive_keeper.rs index 2c3a9dd1fa..5a3686ac66 100644 --- a/src/datanode/src/alive_keeper.rs +++ b/src/datanode/src/alive_keeper.rs @@ -475,7 +475,7 @@ mod test { async fn region_alive_keeper() { common_telemetry::init_default_ut_logging(); let mut region_server = mock_region_server(); - let mut engine_env = TestEnv::with_prefix("region-alive-keeper"); + let mut engine_env = TestEnv::with_prefix("region-alive-keeper").await; let engine = engine_env.create_engine(MitoConfig::default()).await; let engine = Arc::new(engine); region_server.register_engine(engine.clone()); diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 1aff1b7f47..a9211107fb 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -278,7 +278,7 @@ mod tests { let mut region_server = mock_region_server(); let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone()); - let mut engine_env = TestEnv::with_prefix("close-region"); + let mut engine_env = TestEnv::with_prefix("close-region").await; let engine = engine_env.create_engine(MitoConfig::default()).await; region_server.register_engine(Arc::new(engine)); let region_id = RegionId::new(1024, 1); @@ -326,7 +326,7 @@ mod tests { let mut region_server = mock_region_server(); let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone()); - let mut engine_env = TestEnv::with_prefix("open-region"); + let mut engine_env = TestEnv::with_prefix("open-region").await; let engine = engine_env.create_engine(MitoConfig::default()).await; region_server.register_engine(Arc::new(engine)); let region_id = RegionId::new(1024, 1); @@ -374,7 +374,7 @@ mod tests { let mut region_server = mock_region_server(); let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone()); - let mut engine_env = TestEnv::with_prefix("open-not-exists-region"); + let mut engine_env = TestEnv::with_prefix("open-not-exists-region").await; let engine = engine_env.create_engine(MitoConfig::default()).await; region_server.register_engine(Arc::new(engine)); let region_id = RegionId::new(1024, 1); @@ -406,7 +406,7 @@ mod tests { let mut region_server = mock_region_server(); let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone()); - let mut engine_env = TestEnv::with_prefix("downgrade-region"); + let mut engine_env = TestEnv::with_prefix("downgrade-region").await; let engine = engine_env.create_engine(MitoConfig::default()).await; region_server.register_engine(Arc::new(engine)); let region_id = RegionId::new(1024, 1); diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs index e750b516f1..dc16db4bb4 100644 --- a/src/metric-engine/src/test_util.rs +++ b/src/metric-engine/src/test_util.rs @@ -59,7 +59,7 @@ impl TestEnv { /// Returns a new env with specific `prefix` and `config` for test. pub async fn with_prefix_and_config(prefix: &str, config: EngineConfig) -> Self { - let mut mito_env = MitoTestEnv::with_prefix(prefix); + let mut mito_env = MitoTestEnv::with_prefix(prefix).await; let mito = mito_env.create_engine(MitoConfig::default()).await; let metric = MetricEngine::try_new(mito.clone(), config).unwrap(); Self { diff --git a/src/mito2/src/cache/index/inverted_index.rs b/src/mito2/src/cache/index/inverted_index.rs index 75cfe16d52..10a9f96307 100644 --- a/src/mito2/src/cache/index/inverted_index.rs +++ b/src/mito2/src/cache/index/inverted_index.rs @@ -245,7 +245,7 @@ mod test { let blob = create_inverted_index_blob().await; // Init a test range reader in local fs. - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let file_size = blob.len() as u64; let store = env.init_object_store_manager(); let temp_path = "data"; diff --git a/src/mito2/src/cache/index/result_cache.rs b/src/mito2/src/cache/index/result_cache.rs index b82788763d..ee6bd44b41 100644 --- a/src/mito2/src/cache/index/result_cache.rs +++ b/src/mito2/src/cache/index/result_cache.rs @@ -31,6 +31,11 @@ use crate::sst::parquet::row_selection::RowGroupSelection; const INDEX_RESULT_TYPE: &str = "index_result"; /// Cache for storing index query results. +/// +/// The `RowGroupSelection` is a collection of row groups that match the predicate. +/// +/// Row groups can be partially searched. Row groups that not contained in `RowGroupSelection` are not searched. +/// User can retrieve the partial results and handle uncontained row groups required by the predicate subsequently. pub struct IndexResultCache { cache: Cache<(PredicateKey, FileId), Arc>, } @@ -64,6 +69,8 @@ impl IndexResultCache { } /// Puts a query result into the cache. + /// + /// Allow user to put a partial result (not containing all row groups) into the cache. pub fn put(&self, key: PredicateKey, file_id: FileId, result: Arc) { let key = (key, file_id); let size = Self::index_result_cache_weight(&key, &result); @@ -74,6 +81,9 @@ impl IndexResultCache { } /// Gets a query result from the cache. + /// + /// Note: the returned `RowGroupSelection` only contains the row groups that are searched. + /// Caller should handle the uncontained row groups required by the predicate subsequently. pub fn get(&self, key: &PredicateKey, file_id: FileId) -> Option> { let res = self.cache.get(&(key.clone(), file_id)); if res.is_some() { diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 458acc968f..d940867a5e 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -449,7 +449,7 @@ mod tests { async fn test_write_and_upload_sst() { // TODO(QuenKar): maybe find a way to create some object server for testing, // and now just use local file system to mock. - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let mock_store = env.init_object_store_manager(); let path_provider = RegionFilePathFactory::new("test".to_string()); @@ -537,7 +537,7 @@ mod tests { #[tokio::test] async fn test_read_metadata_from_write_cache() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let data_home = env.data_home().display().to_string(); let mock_store = env.init_object_store_manager(); @@ -606,7 +606,7 @@ mod tests { #[tokio::test] async fn test_write_cache_clean_tmp_files() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let data_home = env.data_home().display().to_string(); let mock_store = env.init_object_store_manager(); diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index e14dd83c40..0863d3a7bd 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -115,7 +115,7 @@ fn check_region_version( async fn test_alter_region() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -211,7 +211,7 @@ fn build_rows_for_tags( #[tokio::test] async fn test_put_after_alter() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); @@ -316,7 +316,7 @@ async fn test_put_after_alter() { async fn test_alter_region_retry() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -374,7 +374,7 @@ async fn test_alter_region_retry() { async fn test_alter_on_flushing() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let listener = Arc::new(AlterFlushListener::default()); let engine = env .create_engine_with(MitoConfig::default(), None, Some(listener.clone())) @@ -478,7 +478,7 @@ async fn test_alter_on_flushing() { async fn test_alter_column_fulltext_options() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let listener = Arc::new(AlterFlushListener::default()); let engine = env .create_engine_with(MitoConfig::default(), None, Some(listener.clone())) @@ -597,7 +597,7 @@ async fn test_alter_column_fulltext_options() { async fn test_alter_column_set_inverted_index() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let listener = Arc::new(AlterFlushListener::default()); let engine = env .create_engine_with(MitoConfig::default(), None, Some(listener.clone())) @@ -707,7 +707,7 @@ async fn test_alter_column_set_inverted_index() { async fn test_alter_region_ttl_options() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let listener = Arc::new(AlterFlushListener::default()); let engine = env .create_engine_with(MitoConfig::default(), None, Some(listener.clone())) @@ -757,7 +757,7 @@ async fn test_alter_region_ttl_options() { async fn test_write_stall_on_altering() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let listener = Arc::new(NotifyRegionChangeResultListener::default()); let engine = env .create_engine_with(MitoConfig::default(), None, Some(listener.clone())) diff --git a/src/mito2/src/engine/append_mode_test.rs b/src/mito2/src/engine/append_mode_test.rs index 1374edd822..bfadecc1f4 100644 --- a/src/mito2/src/engine/append_mode_test.rs +++ b/src/mito2/src/engine/append_mode_test.rs @@ -31,7 +31,7 @@ use crate::test_util::{ async fn test_append_mode_write_query() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -89,7 +89,7 @@ async fn test_append_mode_write_query() { #[tokio::test] async fn test_append_mode_compaction() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env .create_engine(MitoConfig { ..Default::default() diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index dd4962a6b8..21e9a4837f 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -42,7 +42,7 @@ use crate::test_util::{ #[tokio::test] async fn test_engine_new_stop() { - let mut env = TestEnv::with_prefix("engine-stop"); + let mut env = TestEnv::with_prefix("engine-stop").await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -69,7 +69,7 @@ async fn test_engine_new_stop() { #[tokio::test] async fn test_write_to_region() { - let mut env = TestEnv::with_prefix("write-to-region"); + let mut env = TestEnv::with_prefix("write-to-region").await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -97,7 +97,9 @@ async fn test_region_replay(factory: Option) { let Some(factory) = factory else { return; }; - let mut env = TestEnv::with_prefix("region-replay").with_log_store_factory(factory.clone()); + let mut env = TestEnv::with_prefix("region-replay") + .await + .with_log_store_factory(factory.clone()); let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -173,7 +175,7 @@ async fn test_region_replay(factory: Option) { #[tokio::test] async fn test_write_query_region() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -207,7 +209,7 @@ async fn test_write_query_region() { #[tokio::test] async fn test_different_order() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -268,7 +270,7 @@ async fn test_different_order() { #[tokio::test] async fn test_different_order_and_type() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -332,7 +334,7 @@ async fn test_different_order_and_type() { async fn test_put_delete() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -384,7 +386,7 @@ async fn test_put_delete() { #[tokio::test] async fn test_delete_not_null_fields() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -433,7 +435,7 @@ async fn test_delete_not_null_fields() { #[tokio::test] async fn test_put_overwrite() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -493,7 +495,7 @@ async fn test_put_overwrite() { #[tokio::test] async fn test_absent_and_invalid_columns() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -541,7 +543,7 @@ async fn test_absent_and_invalid_columns() { #[tokio::test] async fn test_region_usage() { - let mut env = TestEnv::with_prefix("region_usage"); + let mut env = TestEnv::with_prefix("region_usage").await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -595,7 +597,7 @@ async fn test_region_usage() { async fn test_engine_with_write_cache() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let path = env.data_home().to_str().unwrap().to_string(); let mito_config = MitoConfig::default().enable_write_cache(path, ReadableSize::mb(512), None); let engine = env.create_engine(mito_config).await; @@ -635,7 +637,7 @@ async fn test_engine_with_write_cache() { #[tokio::test] async fn test_cache_null_primary_key() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env .create_engine(MitoConfig { vector_cache_size: ReadableSize::mb(32), diff --git a/src/mito2/src/engine/batch_open_test.rs b/src/mito2/src/engine/batch_open_test.rs index 793e79ca56..891328781b 100644 --- a/src/mito2/src/engine/batch_open_test.rs +++ b/src/mito2/src/engine/batch_open_test.rs @@ -39,8 +39,9 @@ async fn test_batch_open(factory: Option) { let Some(factory) = factory else { return; }; - let mut env = - TestEnv::with_prefix("open-batch-regions").with_log_store_factory(factory.clone()); + let mut env = TestEnv::with_prefix("open-batch-regions") + .await + .with_log_store_factory(factory.clone()); let engine = env.create_engine(MitoConfig::default()).await; let topic = prepare_test_for_kafka_log_store(&factory).await; @@ -160,8 +161,9 @@ async fn test_batch_open_err(factory: Option) { let Some(factory) = factory else { return; }; - let mut env = - TestEnv::with_prefix("open-batch-regions-err").with_log_store_factory(factory.clone()); + let mut env = TestEnv::with_prefix("open-batch-regions-err") + .await + .with_log_store_factory(factory.clone()); let engine = env.create_engine(MitoConfig::default()).await; let topic = prepare_test_for_kafka_log_store(&factory).await; let mut options = HashMap::new(); diff --git a/src/mito2/src/engine/catchup_test.rs b/src/mito2/src/engine/catchup_test.rs index bb1c949d81..deb39e507b 100644 --- a/src/mito2/src/engine/catchup_test.rs +++ b/src/mito2/src/engine/catchup_test.rs @@ -57,7 +57,9 @@ async fn test_catchup_with_last_entry_id(factory: Option) { return; }; - let mut env = TestEnv::with_prefix("last_entry_id").with_log_store_factory(factory.clone()); + let mut env = TestEnv::with_prefix("last_entry_id") + .await + .with_log_store_factory(factory.clone()); let topic = prepare_test_for_kafka_log_store(&factory).await; let leader_engine = env.create_engine(MitoConfig::default()).await; let follower_engine = env.create_follower_engine(MitoConfig::default()).await; @@ -175,8 +177,9 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option) { return; }; - let mut env = - TestEnv::with_prefix("without_last_entry_id").with_log_store_factory(factory.clone()); + let mut env = TestEnv::with_prefix("without_last_entry_id") + .await + .with_log_store_factory(factory.clone()); let topic = prepare_test_for_kafka_log_store(&factory).await; let leader_engine = env.create_engine(MitoConfig::default()).await; let follower_engine = env.create_follower_engine(MitoConfig::default()).await; @@ -380,8 +384,9 @@ async fn test_catchup_with_manifest_update(factory: Option) { return; }; - let mut env = - TestEnv::with_prefix("without_manifest_update").with_log_store_factory(factory.clone()); + let mut env = TestEnv::with_prefix("without_manifest_update") + .await + .with_log_store_factory(factory.clone()); let topic = prepare_test_for_kafka_log_store(&factory).await; let leader_engine = env.create_engine(MitoConfig::default()).await; let follower_engine = env.create_follower_engine(MitoConfig::default()).await; @@ -545,7 +550,9 @@ async fn test_local_catchup(factory: Option) { return; }; - let mut env = TestEnv::with_prefix("local_catchup").with_log_store_factory(factory.clone()); + let mut env = TestEnv::with_prefix("local_catchup") + .await + .with_log_store_factory(factory.clone()); let leader_engine = env.create_engine(MitoConfig::default()).await; let Some(LogStoreImpl::RaftEngine(log_store)) = env.get_log_store() else { unreachable!() @@ -686,7 +693,7 @@ async fn test_local_catchup(factory: Option) { #[tokio::test] async fn test_catchup_not_exist() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let non_exist_region_id = RegionId::new(1, 1); diff --git a/src/mito2/src/engine/close_test.rs b/src/mito2/src/engine/close_test.rs index d6396e034a..81c9c47b7c 100644 --- a/src/mito2/src/engine/close_test.rs +++ b/src/mito2/src/engine/close_test.rs @@ -21,7 +21,7 @@ use crate::test_util::{CreateRequestBuilder, TestEnv}; #[tokio::test] async fn test_engine_close_region() { - let mut env = TestEnv::with_prefix("close"); + let mut env = TestEnv::with_prefix("close").await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index c92e04c78b..696bd44ad4 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -136,7 +136,7 @@ async fn collect_stream_ts(stream: SendableRecordBatchStream) -> Vec { #[tokio::test] async fn test_compaction_region() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -202,7 +202,7 @@ async fn test_compaction_region() { #[tokio::test] async fn test_infer_compaction_time_window() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -341,7 +341,7 @@ async fn test_infer_compaction_time_window() { #[tokio::test] async fn test_compaction_overlapping_files() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -402,7 +402,7 @@ async fn test_compaction_overlapping_files() { #[tokio::test] async fn test_compaction_region_with_overlapping() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -450,7 +450,7 @@ async fn test_compaction_region_with_overlapping() { #[tokio::test] async fn test_compaction_region_with_overlapping_delete_all() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -506,7 +506,7 @@ async fn test_compaction_region_with_overlapping_delete_all() { #[tokio::test] async fn test_readonly_during_compaction() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let listener = Arc::new(CompactionListener::default()); let engine = env .create_engine_with( @@ -590,7 +590,7 @@ async fn test_readonly_during_compaction() { #[tokio::test] async fn test_compaction_update_time_window() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -686,7 +686,7 @@ async fn test_compaction_update_time_window() { #[tokio::test] async fn test_change_region_compaction_window() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -811,7 +811,7 @@ async fn test_change_region_compaction_window() { #[tokio::test] async fn test_open_overwrite_compaction_window() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); diff --git a/src/mito2/src/engine/create_test.rs b/src/mito2/src/engine/create_test.rs index 4bcc559340..3b0880bef8 100644 --- a/src/mito2/src/engine/create_test.rs +++ b/src/mito2/src/engine/create_test.rs @@ -26,7 +26,7 @@ use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, #[tokio::test] async fn test_engine_create_new_region() { - let mut env = TestEnv::with_prefix("new-region"); + let mut env = TestEnv::with_prefix("new-region").await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -41,7 +41,7 @@ async fn test_engine_create_new_region() { #[tokio::test] async fn test_engine_create_existing_region() { - let mut env = TestEnv::with_prefix("create-existing"); + let mut env = TestEnv::with_prefix("create-existing").await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -61,7 +61,7 @@ async fn test_engine_create_existing_region() { #[tokio::test] async fn test_engine_create_close_create_region() { // This test will trigger create_or_open function. - let mut env = TestEnv::with_prefix("create-close-create"); + let mut env = TestEnv::with_prefix("create-close-create").await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -91,7 +91,7 @@ async fn test_engine_create_close_create_region() { #[tokio::test] async fn test_engine_create_with_different_id() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -110,7 +110,7 @@ async fn test_engine_create_with_different_id() { #[tokio::test] async fn test_engine_create_with_different_schema() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -130,7 +130,7 @@ async fn test_engine_create_with_different_schema() { #[tokio::test] async fn test_engine_create_with_different_primary_key() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -150,7 +150,7 @@ async fn test_engine_create_with_different_primary_key() { #[tokio::test] async fn test_engine_create_with_options() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -172,7 +172,7 @@ async fn test_engine_create_with_options() { #[tokio::test] async fn test_engine_create_with_custom_store() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env .create_engine_with_multiple_object_stores(MitoConfig::default(), None, None, &["Gcs"]) .await; @@ -204,7 +204,7 @@ async fn test_engine_create_with_custom_store() { #[tokio::test] async fn test_engine_create_with_memtable_opts() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); diff --git a/src/mito2/src/engine/drop_test.rs b/src/mito2/src/engine/drop_test.rs index 6c1056270c..6386cd20ab 100644 --- a/src/mito2/src/engine/drop_test.rs +++ b/src/mito2/src/engine/drop_test.rs @@ -35,7 +35,7 @@ use crate::worker::DROPPING_MARKER_FILE; async fn test_engine_drop_region() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::with_prefix("drop"); + let mut env = TestEnv::with_prefix("drop").await; let listener = Arc::new(DropListener::new(Duration::from_millis(100))); let engine = env .create_engine_with(MitoConfig::default(), None, Some(listener.clone())) @@ -143,7 +143,7 @@ async fn test_engine_drop_region_for_custom_store() { put_rows(engine, region_id, rows).await; flush_region(engine, region_id, None).await; } - let mut env = TestEnv::with_prefix("drop"); + let mut env = TestEnv::with_prefix("drop").await; let listener = Arc::new(DropListener::new(Duration::from_millis(100))); let engine = env .create_engine_with_multiple_object_stores( diff --git a/src/mito2/src/engine/edit_region_test.rs b/src/mito2/src/engine/edit_region_test.rs index 2b66422f06..2a830ed331 100644 --- a/src/mito2/src/engine/edit_region_test.rs +++ b/src/mito2/src/engine/edit_region_test.rs @@ -33,7 +33,7 @@ use crate::test_util::{CreateRequestBuilder, TestEnv}; #[tokio::test] async fn test_edit_region_schedule_compaction() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; struct EditRegionListener { tx: Mutex>>, @@ -122,7 +122,7 @@ async fn test_edit_region_schedule_compaction() { #[tokio::test] async fn test_edit_region_fill_cache() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; struct EditRegionListener { tx: Mutex>>, @@ -241,7 +241,7 @@ async fn test_edit_region_concurrently() { } } - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env .create_engine(MitoConfig { // Suppress the compaction to not impede the speed of this kinda stress testing. diff --git a/src/mito2/src/engine/filter_deleted_test.rs b/src/mito2/src/engine/filter_deleted_test.rs index d1e2328541..8e856ce75d 100644 --- a/src/mito2/src/engine/filter_deleted_test.rs +++ b/src/mito2/src/engine/filter_deleted_test.rs @@ -28,7 +28,7 @@ use crate::test_util::{ async fn test_scan_without_filtering_deleted() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index 03143118ac..7593299950 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -41,7 +41,7 @@ use crate::worker::MAX_INITIAL_CHECK_DELAY_SECS; #[tokio::test] async fn test_manual_flush() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -91,7 +91,7 @@ async fn test_manual_flush() { #[tokio::test] async fn test_flush_engine() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); let listener = Arc::new(FlushListener::default()); let engine = env @@ -161,7 +161,7 @@ async fn test_flush_engine() { #[tokio::test] async fn test_write_stall() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); let listener = Arc::new(StallListener::default()); let engine = env @@ -236,7 +236,7 @@ async fn test_write_stall() { #[tokio::test] async fn test_flush_empty() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); let engine = env .create_engine_with( @@ -289,7 +289,7 @@ async fn test_flush_reopen_region(factory: Option) { return; }; - let mut env = TestEnv::new().with_log_store_factory(factory.clone()); + let mut env = TestEnv::new().await.with_log_store_factory(factory.clone()); let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); env.get_schema_metadata_manager() @@ -396,7 +396,7 @@ impl MockTimeProvider { #[tokio::test] async fn test_auto_flush_engine() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); let listener = Arc::new(FlushListener::default()); let now = current_time_millis(); @@ -467,7 +467,7 @@ async fn test_auto_flush_engine() { #[tokio::test] async fn test_flush_workers() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); let listener = Arc::new(FlushListener::default()); let engine = env @@ -554,7 +554,7 @@ async fn test_update_topic_latest_entry_id(factory: Option) { let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); let listener = Arc::new(FlushListener::default()); - let mut env = TestEnv::new().with_log_store_factory(factory.clone()); + let mut env = TestEnv::new().await.with_log_store_factory(factory.clone()); let engine = env .create_engine_with( MitoConfig::default(), diff --git a/src/mito2/src/engine/merge_mode_test.rs b/src/mito2/src/engine/merge_mode_test.rs index 0bc0ee4ace..354b3d414d 100644 --- a/src/mito2/src/engine/merge_mode_test.rs +++ b/src/mito2/src/engine/merge_mode_test.rs @@ -31,7 +31,7 @@ use crate::test_util::{ async fn test_merge_mode_write_query() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -89,7 +89,7 @@ async fn test_merge_mode_write_query() { async fn test_merge_mode_compaction() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env .create_engine(MitoConfig { ..Default::default() diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index 93f3963e0b..f28b6c7177 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -36,7 +36,7 @@ use crate::test_util::{ #[tokio::test] async fn test_engine_open_empty() { - let mut env = TestEnv::with_prefix("open-empty"); + let mut env = TestEnv::with_prefix("open-empty").await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -63,7 +63,7 @@ async fn test_engine_open_empty() { #[tokio::test] async fn test_engine_open_existing() { - let mut env = TestEnv::with_prefix("open-exiting"); + let mut env = TestEnv::with_prefix("open-exiting").await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -90,7 +90,7 @@ async fn test_engine_open_existing() { #[tokio::test] async fn test_engine_reopen_region() { - let mut env = TestEnv::with_prefix("reopen-region"); + let mut env = TestEnv::with_prefix("reopen-region").await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -107,7 +107,7 @@ async fn test_engine_reopen_region() { #[tokio::test] async fn test_engine_open_readonly() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -150,7 +150,7 @@ async fn test_engine_open_readonly() { #[tokio::test] async fn test_engine_region_open_with_options() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -190,7 +190,7 @@ async fn test_engine_region_open_with_options() { #[tokio::test] async fn test_engine_region_open_with_custom_store() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env .create_engine_with_multiple_object_stores(MitoConfig::default(), None, None, &["Gcs"]) .await; @@ -244,7 +244,7 @@ async fn test_engine_region_open_with_custom_store() { #[tokio::test] async fn test_open_region_skip_wal_replay() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -344,7 +344,7 @@ async fn test_open_region_skip_wal_replay() { #[tokio::test] async fn test_open_region_wait_for_opening_region_ok() { - let mut env = TestEnv::with_prefix("wait-for-opening-region-ok"); + let mut env = TestEnv::with_prefix("wait-for-opening-region-ok").await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); let worker = engine.inner.workers.worker(region_id); @@ -383,7 +383,7 @@ async fn test_open_region_wait_for_opening_region_ok() { #[tokio::test] async fn test_open_region_wait_for_opening_region_err() { - let mut env = TestEnv::with_prefix("wait-for-opening-region-err"); + let mut env = TestEnv::with_prefix("wait-for-opening-region-err").await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); let worker = engine.inner.workers.worker(region_id); @@ -428,7 +428,7 @@ async fn test_open_region_wait_for_opening_region_err() { #[tokio::test] async fn test_open_compaction_region() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let mut mito_config = MitoConfig::default(); mito_config .sanitize(&env.data_home().display().to_string()) diff --git a/src/mito2/src/engine/parallel_test.rs b/src/mito2/src/engine/parallel_test.rs index 9c424d3172..460149d91f 100644 --- a/src/mito2/src/engine/parallel_test.rs +++ b/src/mito2/src/engine/parallel_test.rs @@ -73,7 +73,7 @@ async fn scan_in_parallel( #[tokio::test] async fn test_parallel_scan() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); diff --git a/src/mito2/src/engine/projection_test.rs b/src/mito2/src/engine/projection_test.rs index ef1f180e9e..90b8a70e46 100644 --- a/src/mito2/src/engine/projection_test.rs +++ b/src/mito2/src/engine/projection_test.rs @@ -53,7 +53,7 @@ fn build_rows_multi_tags_fields( #[tokio::test] async fn test_scan_projection() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); diff --git a/src/mito2/src/engine/prune_test.rs b/src/mito2/src/engine/prune_test.rs index c0f8eb6ffb..6d774d8b4d 100644 --- a/src/mito2/src/engine/prune_test.rs +++ b/src/mito2/src/engine/prune_test.rs @@ -27,7 +27,7 @@ use crate::test_util::{ }; async fn check_prune_row_groups(exprs: Vec, expected: &str) { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -147,7 +147,7 @@ fn time_range_expr(start_sec: i64, end_sec: i64) -> Expr { #[tokio::test] async fn test_prune_memtable() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -221,7 +221,7 @@ async fn test_prune_memtable() { #[tokio::test] async fn test_prune_memtable_complex_expr() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -274,7 +274,7 @@ async fn test_prune_memtable_complex_expr() { #[tokio::test] async fn test_mem_range_prune() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); diff --git a/src/mito2/src/engine/row_selector_test.rs b/src/mito2/src/engine/row_selector_test.rs index dcff07d525..44e3ff66b6 100644 --- a/src/mito2/src/engine/row_selector_test.rs +++ b/src/mito2/src/engine/row_selector_test.rs @@ -25,7 +25,7 @@ use crate::test_util::{ }; async fn test_last_row(append_mode: bool) { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); diff --git a/src/mito2/src/engine/scan_test.rs b/src/mito2/src/engine/scan_test.rs index aee5873bb2..4391bbabba 100644 --- a/src/mito2/src/engine/scan_test.rs +++ b/src/mito2/src/engine/scan_test.rs @@ -27,7 +27,7 @@ use crate::test_util::{CreateRequestBuilder, TestEnv}; #[tokio::test] async fn test_scan_with_min_sst_sequence() { - let mut env = TestEnv::with_prefix("test_scan_with_min_sst_sequence"); + let mut env = TestEnv::with_prefix("test_scan_with_min_sst_sequence").await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -153,7 +153,7 @@ async fn test_scan_with_min_sst_sequence() { #[tokio::test] async fn test_series_scan() { - let mut env = TestEnv::with_prefix("test_series_scan"); + let mut env = TestEnv::with_prefix("test_series_scan").await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); diff --git a/src/mito2/src/engine/set_role_state_test.rs b/src/mito2/src/engine/set_role_state_test.rs index 93dbc69407..1bb5346b15 100644 --- a/src/mito2/src/engine/set_role_state_test.rs +++ b/src/mito2/src/engine/set_role_state_test.rs @@ -32,7 +32,7 @@ async fn test_set_role_state_gracefully() { SettableRegionRoleState::DowngradingLeader, ]; for settable_role_state in settable_role_states { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); @@ -101,7 +101,7 @@ async fn test_set_role_state_gracefully() { #[tokio::test] async fn test_set_role_state_gracefully_not_exist() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let non_exist_region_id = RegionId::new(1, 1); @@ -116,7 +116,7 @@ async fn test_set_role_state_gracefully_not_exist() { #[tokio::test] async fn test_write_downgrading_region() { - let mut env = TestEnv::with_prefix("write-to-downgrading-region"); + let mut env = TestEnv::with_prefix("write-to-downgrading-region").await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); diff --git a/src/mito2/src/engine/sync_test.rs b/src/mito2/src/engine/sync_test.rs index 842227670c..9e0eb2cd01 100644 --- a/src/mito2/src/engine/sync_test.rs +++ b/src/mito2/src/engine/sync_test.rs @@ -71,7 +71,7 @@ async fn scan_check( #[tokio::test] async fn test_sync_after_flush_region() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); env.get_schema_metadata_manager() @@ -163,7 +163,7 @@ async fn test_sync_after_flush_region() { async fn test_sync_after_alter_region() { common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index 5278c0208a..7a5823ed9f 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -33,7 +33,7 @@ use crate::test_util::{ #[tokio::test] async fn test_engine_truncate_region_basic() { - let mut env = TestEnv::with_prefix("truncate-basic"); + let mut env = TestEnv::with_prefix("truncate-basic").await; let engine = env.create_engine(MitoConfig::default()).await; // Create the region. @@ -83,7 +83,7 @@ async fn test_engine_truncate_region_basic() { #[tokio::test] async fn test_engine_put_data_after_truncate() { - let mut env = TestEnv::with_prefix("truncate-put"); + let mut env = TestEnv::with_prefix("truncate-put").await; let engine = env.create_engine(MitoConfig::default()).await; // Create the region. @@ -146,7 +146,7 @@ async fn test_engine_put_data_after_truncate() { #[tokio::test] async fn test_engine_truncate_after_flush() { - let mut env = TestEnv::with_prefix("truncate-flush"); + let mut env = TestEnv::with_prefix("truncate-flush").await; let engine = env.create_engine(MitoConfig::default()).await; // Create the region. @@ -223,7 +223,7 @@ async fn test_engine_truncate_after_flush() { #[tokio::test] async fn test_engine_truncate_reopen() { - let mut env = TestEnv::with_prefix("truncate-reopen"); + let mut env = TestEnv::with_prefix("truncate-reopen").await; let engine = env.create_engine(MitoConfig::default()).await; // Create the region. @@ -282,7 +282,7 @@ async fn test_engine_truncate_reopen() { #[tokio::test] async fn test_engine_truncate_during_flush() { init_default_ut_logging(); - let mut env = TestEnv::with_prefix("truncate-during-flush"); + let mut env = TestEnv::with_prefix("truncate-during-flush").await; let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); let listener = Arc::new(FlushTruncateListener::default()); let engine = env diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index 2590d7ae6c..b5f43a9984 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -594,7 +594,7 @@ mod test { #[tokio::test] async fn create_manifest_manager() { let metadata = Arc::new(basic_region_metadata()); - let env = TestEnv::new(); + let env = TestEnv::new().await; let manager = env .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone())) .await @@ -606,7 +606,7 @@ mod test { #[tokio::test] async fn open_manifest_manager() { - let env = TestEnv::new(); + let env = TestEnv::new().await; // Try to opens an empty manifest. assert!(env .create_manifest_manager(CompressionType::Uncompressed, 10, None) @@ -637,7 +637,7 @@ mod test { #[tokio::test] async fn region_change_add_column() { let metadata = Arc::new(basic_region_metadata()); - let env = TestEnv::new(); + let env = TestEnv::new().await; let mut manager = env .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone())) .await @@ -696,7 +696,7 @@ mod test { let metadata = Arc::new(basic_region_metadata()); let data_home = create_temp_dir(""); let data_home_path = data_home.path().to_str().unwrap().to_string(); - let env = TestEnv::with_data_home(data_home); + let env = TestEnv::with_data_home(data_home).await; let manifest_dir = format!("{}/manifest", data_home_path); diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index 2ebf7cd5bf..a849853526 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -34,7 +34,7 @@ async fn build_manager( compress_type: CompressionType, ) -> (TestEnv, RegionManifestManager) { let metadata = Arc::new(basic_region_metadata()); - let env = TestEnv::new(); + let env = TestEnv::new().await; let manager = env .create_manifest_manager(compress_type, checkpoint_distance, Some(metadata.clone())) .await diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs index 6f7c587397..efb9af4fe2 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -128,6 +128,9 @@ impl BloomFilterIndexApplier { /// list of row group ranges that match the predicates. /// /// The `row_groups` iterator provides the row group lengths and whether to search in the row group. + /// + /// Row group id existing in the returned result means that the row group is searched. + /// Empty ranges means that the row group is searched but no rows are found. pub async fn apply( &self, file_id: FileId, @@ -195,7 +198,6 @@ impl BloomFilterIndexApplier { range.end -= start; } } - output.retain(|(_, ranges)| !ranges.is_empty()); Ok(output) } @@ -386,6 +388,9 @@ mod tests { .apply(file_id, None, row_groups.into_iter()) .await .unwrap() + .into_iter() + .filter(|(_, ranges)| !ranges.is_empty()) + .collect() }) } } diff --git a/src/mito2/src/sst/index/fulltext_index/applier.rs b/src/mito2/src/sst/index/fulltext_index/applier.rs index 20a15937ac..03235f3e7e 100644 --- a/src/mito2/src/sst/index/fulltext_index/applier.rs +++ b/src/mito2/src/sst/index/fulltext_index/applier.rs @@ -231,6 +231,9 @@ impl FulltextIndexApplier { impl FulltextIndexApplier { /// Applies coarse-grained fulltext index to the specified SST file. /// Returns (row group id -> ranges) that match the queries. + /// + /// Row group id existing in the returned result means that the row group is searched. + /// Empty ranges means that the row group is searched but no rows are found. pub async fn apply_coarse( &self, file_id: FileId, @@ -367,7 +370,7 @@ impl FulltextIndexApplier { /// Adjusts the coarse output. Makes the output ranges based on row group start. fn adjust_coarse_output( input: Vec<(usize, Range)>, - output: &mut Vec<(usize, Vec>)>, + output: &mut [(usize, Vec>)], ) { // adjust ranges to be based on row group for ((_, output), (_, input)) in output.iter_mut().zip(input) { @@ -377,7 +380,6 @@ impl FulltextIndexApplier { range.end -= start; } } - output.retain(|(_, ranges)| !ranges.is_empty()); } /// Converts terms to predicates. diff --git a/src/mito2/src/sst/index/fulltext_index/creator.rs b/src/mito2/src/sst/index/fulltext_index/creator.rs index 07da9f3b47..52bc34bb3b 100644 --- a/src/mito2/src/sst/index/fulltext_index/creator.rs +++ b/src/mito2/src/sst/index/fulltext_index/creator.rs @@ -625,6 +625,7 @@ mod tests { .unwrap(); resp.map(|r| { r.into_iter() + .filter(|(_, ranges)| !ranges.is_empty()) .map(|(row_group_id, _)| row_group_id as RowId) .collect() }) diff --git a/src/mito2/src/sst/index/indexer/finish.rs b/src/mito2/src/sst/index/indexer/finish.rs index ce00be0ae0..56da128934 100644 --- a/src/mito2/src/sst/index/indexer/finish.rs +++ b/src/mito2/src/sst/index/indexer/finish.rs @@ -14,10 +14,8 @@ use common_telemetry::{debug, warn}; use puffin::puffin_manager::{PuffinManager, PuffinWriter}; +use store_api::storage::ColumnId; -use crate::sst::index::bloom_filter::creator::BloomFilterIndexer; -use crate::sst::index::fulltext_index::creator::FulltextIndexer; -use crate::sst::index::inverted_index::creator::InvertedIndexer; use crate::sst::index::puffin_manager::SstPuffinWriter; use crate::sst::index::statistics::{ByteCount, RowCount}; use crate::sst::index::{ @@ -113,13 +111,14 @@ impl Indexer { return true; }; + let column_ids = indexer.column_ids().collect(); let err = match indexer.finish(puffin_writer).await { Ok((row_count, byte_count)) => { self.fill_inverted_index_output( &mut index_output.inverted_index, row_count, byte_count, - &indexer, + column_ids, ); return true; } @@ -150,13 +149,14 @@ impl Indexer { return true; }; + let column_ids = indexer.column_ids().collect(); let err = match indexer.finish(puffin_writer).await { Ok((row_count, byte_count)) => { self.fill_fulltext_index_output( &mut index_output.fulltext_index, row_count, byte_count, - &indexer, + column_ids, ); return true; } @@ -187,13 +187,14 @@ impl Indexer { return true; }; + let column_ids = indexer.column_ids().collect(); let err = match indexer.finish(puffin_writer).await { Ok((row_count, byte_count)) => { self.fill_bloom_filter_output( &mut index_output.bloom_filter, row_count, byte_count, - &indexer, + column_ids, ); return true; } @@ -220,16 +221,16 @@ impl Indexer { output: &mut InvertedIndexOutput, row_count: RowCount, byte_count: ByteCount, - indexer: &InvertedIndexer, + column_ids: Vec, ) { debug!( - "Inverted index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}", - self.region_id, self.file_id, byte_count, row_count + "Inverted index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}", + self.region_id, self.file_id, byte_count, row_count, column_ids ); output.index_size = byte_count; output.row_count = row_count; - output.columns = indexer.column_ids().collect(); + output.columns = column_ids; } fn fill_fulltext_index_output( @@ -237,16 +238,16 @@ impl Indexer { output: &mut FulltextIndexOutput, row_count: RowCount, byte_count: ByteCount, - indexer: &FulltextIndexer, + column_ids: Vec, ) { debug!( - "Full-text index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}", - self.region_id, self.file_id, byte_count, row_count + "Full-text index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}", + self.region_id, self.file_id, byte_count, row_count, column_ids ); output.index_size = byte_count; output.row_count = row_count; - output.columns = indexer.column_ids().collect(); + output.columns = column_ids; } fn fill_bloom_filter_output( @@ -254,15 +255,15 @@ impl Indexer { output: &mut BloomFilterOutput, row_count: RowCount, byte_count: ByteCount, - indexer: &BloomFilterIndexer, + column_ids: Vec, ) { debug!( - "Bloom filter created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}", - self.region_id, self.file_id, byte_count, row_count + "Bloom filter created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}", + self.region_id, self.file_id, byte_count, row_count, column_ids ); output.index_size = byte_count; output.row_count = row_count; - output.columns = indexer.column_ids().collect(); + output.columns = column_ids; } } diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 93cf797dae..0a4c3e2b62 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -88,11 +88,12 @@ pub struct SstInfo { #[cfg(test)] mod tests { + use std::collections::HashSet; use std::sync::Arc; use common_time::Timestamp; use datafusion_common::{Column, ScalarValue}; - use datafusion_expr::{BinaryExpr, Expr, Operator}; + use datafusion_expr::{col, lit, BinaryExpr, Expr, Operator}; use datatypes::arrow; use datatypes::arrow::array::RecordBatch; use datatypes::arrow::datatypes::{DataType, Field, Schema}; @@ -104,12 +105,17 @@ mod tests { use tokio_util::compat::FuturesAsyncWriteCompatExt; use super::*; - use crate::access_layer::{FilePathProvider, RegionFilePathFactory}; + use crate::access_layer::{FilePathProvider, OperationType, RegionFilePathFactory}; use crate::cache::{CacheManager, CacheStrategy, PageKey}; use crate::read::BatchReader; - use crate::sst::index::{Indexer, IndexerBuilder}; + use crate::region::options::{IndexOptions, InvertedIndexOptions}; + use crate::sst::file::{FileHandle, FileMeta}; + use crate::sst::file_purger::NoopFilePurger; + use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder; + use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; + use crate::sst::index::{Indexer, IndexerBuilder, IndexerBuilderImpl}; use crate::sst::parquet::format::WriteFormat; - use crate::sst::parquet::reader::ParquetReaderBuilder; + use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics}; use crate::sst::parquet::writer::ParquetWriter; use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY}; use crate::test_util::sst_util::{ @@ -147,7 +153,7 @@ mod tests { #[tokio::test] async fn test_write_read() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); let file_path = FixedPathProvider { @@ -205,7 +211,7 @@ mod tests { #[tokio::test] async fn test_read_with_cache() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); let metadata = Arc::new(sst_region_metadata()); @@ -275,7 +281,7 @@ mod tests { #[tokio::test] async fn test_parquet_metadata_eq() { // create test env - let mut env = crate::test_util::TestEnv::new(); + let mut env = crate::test_util::TestEnv::new().await; let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); let metadata = Arc::new(sst_region_metadata()); @@ -318,7 +324,7 @@ mod tests { #[tokio::test] async fn test_read_with_tag_filter() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); let metadata = Arc::new(sst_region_metadata()); @@ -370,7 +376,7 @@ mod tests { #[tokio::test] async fn test_read_empty_batch() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); let metadata = Arc::new(sst_region_metadata()); @@ -407,7 +413,7 @@ mod tests { #[tokio::test] async fn test_read_with_field_filter() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); let metadata = Arc::new(sst_region_metadata()); @@ -453,7 +459,7 @@ mod tests { #[tokio::test] async fn test_read_large_binary() { - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); let file_path = handle.file_path(FILE_DIR); @@ -544,7 +550,7 @@ mod tests { async fn test_write_multiple_files() { common_telemetry::init_default_ut_logging(); // create test env - let mut env = TestEnv::new(); + let mut env = TestEnv::new().await; let object_store = env.init_object_store_manager(); let metadata = Arc::new(sst_region_metadata()); let batches = &[ @@ -593,4 +599,300 @@ mod tests { } assert_eq!(total_rows, rows_read); } + + #[tokio::test] + async fn test_write_read_with_index() { + let mut env = TestEnv::new().await; + let object_store = env.init_object_store_manager(); + let file_path = RegionFilePathFactory::new(FILE_DIR.to_string()); + let metadata = Arc::new(sst_region_metadata()); + let row_group_size = 50; + + let source = new_source(&[ + new_batch_by_range(&["a", "d"], 0, 20), + new_batch_by_range(&["b", "d"], 0, 20), + new_batch_by_range(&["c", "d"], 0, 20), + new_batch_by_range(&["c", "f"], 0, 40), + new_batch_by_range(&["c", "h"], 100, 200), + ]); + // Use a small row group size for test. + let write_opts = WriteOptions { + row_group_size, + ..Default::default() + }; + + let puffin_manager = env + .get_puffin_manager() + .build(object_store.clone(), file_path.clone()); + let intermediate_manager = env.get_intermediate_manager(); + + let indexer_builder = IndexerBuilderImpl { + op_type: OperationType::Flush, + metadata: metadata.clone(), + row_group_size, + puffin_manager, + intermediate_manager, + index_options: IndexOptions { + inverted_index: InvertedIndexOptions { + segment_row_count: 1, + ..Default::default() + }, + }, + inverted_index_config: Default::default(), + fulltext_index_config: Default::default(), + bloom_filter_index_config: Default::default(), + }; + + let mut writer = ParquetWriter::new_with_object_store( + object_store.clone(), + metadata.clone(), + indexer_builder, + file_path.clone(), + ) + .await; + + let info = writer + .write_all(source, None, &write_opts) + .await + .unwrap() + .remove(0); + assert_eq!(200, info.num_rows); + assert!(info.file_size > 0); + assert!(info.index_metadata.file_size > 0); + + assert!(info.index_metadata.inverted_index.index_size > 0); + assert_eq!(info.index_metadata.inverted_index.row_count, 200); + assert_eq!(info.index_metadata.inverted_index.columns, vec![0]); + + assert!(info.index_metadata.bloom_filter.index_size > 0); + assert_eq!(info.index_metadata.bloom_filter.row_count, 200); + assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]); + + assert_eq!( + ( + Timestamp::new_millisecond(0), + Timestamp::new_millisecond(199) + ), + info.time_range + ); + + let handle = FileHandle::new( + FileMeta { + region_id: metadata.region_id, + file_id: info.file_id, + time_range: info.time_range, + level: 0, + file_size: info.file_size, + available_indexes: info.index_metadata.build_available_indexes(), + index_file_size: info.index_metadata.file_size, + num_row_groups: info.num_row_groups, + num_rows: info.num_rows as u64, + sequence: None, + }, + Arc::new(NoopFilePurger), + ); + + let cache = Arc::new( + CacheManager::builder() + .index_result_cache_size(1024 * 1024) + .index_metadata_size(1024 * 1024) + .index_content_page_size(1024 * 1024) + .index_content_size(1024 * 1024) + .puffin_metadata_size(1024 * 1024) + .build(), + ); + let index_result_cache = cache.index_result_cache().unwrap(); + + let build_inverted_index_applier = |exprs: &[Expr]| { + InvertedIndexApplierBuilder::new( + FILE_DIR.to_string(), + object_store.clone(), + &metadata, + HashSet::from_iter([0]), + env.get_puffin_manager(), + ) + .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned()) + .with_inverted_index_cache(cache.inverted_index_cache().cloned()) + .build(exprs) + .unwrap() + .map(Arc::new) + }; + + let build_bloom_filter_applier = |exprs: &[Expr]| { + BloomFilterIndexApplierBuilder::new( + FILE_DIR.to_string(), + object_store.clone(), + &metadata, + env.get_puffin_manager(), + ) + .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned()) + .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned()) + .build(exprs) + .unwrap() + .map(Arc::new) + }; + + // Data: ts tag_0 tag_1 + // Data: 0-20 [a, d] + // 0-20 [b, d] + // 0-20 [c, d] + // 0-40 [c, f] + // 100-200 [c, h] + // + // Pred: tag_0 = "b" + // + // Row groups & rows pruning: + // + // Row Groups: + // - min-max: filter out row groups 1..=3 + // + // Rows: + // - inverted index: hit row group 0, hit 20 rows + let preds = vec![col("tag_0").eq(lit("b"))]; + let inverted_index_applier = build_inverted_index_applier(&preds); + let bloom_filter_applier = build_bloom_filter_applier(&preds); + + let builder = + ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store.clone()) + .predicate(Some(Predicate::new(preds))) + .inverted_index_applier(inverted_index_applier.clone()) + .bloom_filter_index_applier(bloom_filter_applier.clone()) + .cache(CacheStrategy::EnableAll(cache.clone())); + + let mut metrics = ReaderMetrics::default(); + let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap(); + let mut reader = ParquetReader::new(Arc::new(context), selection) + .await + .unwrap(); + check_reader_result(&mut reader, &[new_batch_by_range(&["b", "d"], 0, 20)]).await; + + assert_eq!(metrics.filter_metrics.rg_total, 4); + assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3); + assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0); + assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30); + let cached = index_result_cache + .get( + inverted_index_applier.unwrap().predicate_key(), + handle.file_id(), + ) + .unwrap(); + // inverted index will search all row groups + assert!(cached.contains_row_group(0)); + assert!(cached.contains_row_group(1)); + assert!(cached.contains_row_group(2)); + assert!(cached.contains_row_group(3)); + + // Data: ts tag_0 tag_1 + // Data: 0-20 [a, d] + // 0-20 [b, d] + // 0-20 [c, d] + // 0-40 [c, f] + // 100-200 [c, h] + // + // Pred: 50 <= ts && ts < 200 && tag_1 = "d" + // + // Row groups & rows pruning: + // + // Row Groups: + // - min-max: filter out row groups 0..=1 + // - bloom filter: filter out row groups 2..=3 + let preds = vec![ + col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))), + col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))), + col("tag_1").eq(lit("d")), + ]; + let inverted_index_applier = build_inverted_index_applier(&preds); + let bloom_filter_applier = build_bloom_filter_applier(&preds); + + let builder = + ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store.clone()) + .predicate(Some(Predicate::new(preds))) + .inverted_index_applier(inverted_index_applier.clone()) + .bloom_filter_index_applier(bloom_filter_applier.clone()) + .cache(CacheStrategy::EnableAll(cache.clone())); + + let mut metrics = ReaderMetrics::default(); + let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap(); + let mut reader = ParquetReader::new(Arc::new(context), selection) + .await + .unwrap(); + check_reader_result(&mut reader, &[]).await; + + assert_eq!(metrics.filter_metrics.rg_total, 4); + assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2); + assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2); + assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100); + let cached = index_result_cache + .get( + bloom_filter_applier.unwrap().predicate_key(), + handle.file_id(), + ) + .unwrap(); + assert!(cached.contains_row_group(2)); + assert!(cached.contains_row_group(3)); + assert!(!cached.contains_row_group(0)); + assert!(!cached.contains_row_group(1)); + + // Remove the pred of `ts`, continue to use the pred of `tag_1` + // to test if cache works. + + // Data: ts tag_0 tag_1 + // Data: 0-20 [a, d] + // 0-20 [b, d] + // 0-20 [c, d] + // 0-40 [c, f] + // 100-200 [c, h] + // + // Pred: tag_1 = "d" + // + // Row groups & rows pruning: + // + // Row Groups: + // - bloom filter: filter out row groups 2..=3 + // + // Rows: + // - bloom filter: hit row group 0, hit 50 rows + // hit row group 1, hit 10 rows + let preds = vec![col("tag_1").eq(lit("d"))]; + let inverted_index_applier = build_inverted_index_applier(&preds); + let bloom_filter_applier = build_bloom_filter_applier(&preds); + + let builder = + ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store.clone()) + .predicate(Some(Predicate::new(preds))) + .inverted_index_applier(inverted_index_applier.clone()) + .bloom_filter_index_applier(bloom_filter_applier.clone()) + .cache(CacheStrategy::EnableAll(cache.clone())); + + let mut metrics = ReaderMetrics::default(); + let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap(); + let mut reader = ParquetReader::new(Arc::new(context), selection) + .await + .unwrap(); + check_reader_result( + &mut reader, + &[ + new_batch_by_range(&["a", "d"], 0, 20), + new_batch_by_range(&["b", "d"], 0, 20), + new_batch_by_range(&["c", "d"], 0, 10), + new_batch_by_range(&["c", "d"], 10, 20), + ], + ) + .await; + + assert_eq!(metrics.filter_metrics.rg_total, 4); + assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0); + assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2); + assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140); + let cached = index_result_cache + .get( + bloom_filter_applier.unwrap().predicate_key(), + handle.file_id(), + ) + .unwrap(); + assert!(cached.contains_row_group(0)); + assert!(cached.contains_row_group(1)); + assert!(cached.contains_row_group(2)); + assert!(cached.contains_row_group(3)); + } } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index ec25b0415a..571ad24e7c 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -378,14 +378,24 @@ impl ParquetReaderBuilder { } let fulltext_filtered = self - .prune_row_groups_by_fulltext_index(row_group_size, parquet_meta, &mut output, metrics) + .prune_row_groups_by_fulltext_index( + row_group_size, + num_row_groups, + &mut output, + metrics, + ) .await; if output.is_empty() { return output; } - self.prune_row_groups_by_inverted_index(row_group_size, &mut output, metrics) - .await; + self.prune_row_groups_by_inverted_index( + row_group_size, + num_row_groups, + &mut output, + metrics, + ) + .await; if output.is_empty() { return output; } @@ -412,7 +422,7 @@ impl ParquetReaderBuilder { async fn prune_row_groups_by_fulltext_index( &self, row_group_size: usize, - parquet_meta: &ParquetMetaData, + num_row_groups: usize, output: &mut RowGroupSelection, metrics: &mut ReaderFilterMetrics, ) -> bool { @@ -425,14 +435,15 @@ impl ParquetReaderBuilder { let predicate_key = index_applier.predicate_key(); // Fast path: return early if the result is in the cache. - if self.index_result_cache_get( - predicate_key, - self.file_handle.file_id(), - output, - metrics, - INDEX_TYPE_FULLTEXT, - ) { - return true; + let cached = self + .cache_strategy + .index_result_cache() + .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id())); + if let Some(result) = cached.as_ref() { + if all_required_row_groups_searched(output, result) { + apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT); + return true; + } } // Slow path: apply the index from the file. @@ -441,9 +452,7 @@ impl ParquetReaderBuilder { .apply_fine(self.file_handle.file_id(), Some(file_size_hint)) .await; let selection = match apply_res { - Ok(Some(res)) => { - RowGroupSelection::from_row_ids(res, row_group_size, parquet_meta.num_row_groups()) - } + Ok(Some(res)) => RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups), Ok(None) => return false, Err(err) => { handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT); @@ -470,6 +479,7 @@ impl ParquetReaderBuilder { async fn prune_row_groups_by_inverted_index( &self, row_group_size: usize, + num_row_groups: usize, output: &mut RowGroupSelection, metrics: &mut ReaderFilterMetrics, ) -> bool { @@ -482,14 +492,15 @@ impl ParquetReaderBuilder { let predicate_key = index_applier.predicate_key(); // Fast path: return early if the result is in the cache. - if self.index_result_cache_get( - predicate_key, - self.file_handle.file_id(), - output, - metrics, - INDEX_TYPE_INVERTED, - ) { - return true; + let cached = self + .cache_strategy + .index_result_cache() + .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id())); + if let Some(result) = cached.as_ref() { + if all_required_row_groups_searched(output, result) { + apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED); + return true; + } } // Slow path: apply the index from the file. @@ -498,9 +509,11 @@ impl ParquetReaderBuilder { .apply(self.file_handle.file_id(), Some(file_size_hint)) .await; let selection = match apply_res { - Ok(output) => { - RowGroupSelection::from_inverted_index_apply_output(row_group_size, output) - } + Ok(output) => RowGroupSelection::from_inverted_index_apply_output( + row_group_size, + num_row_groups, + output, + ), Err(err) => { handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED); return false; @@ -534,27 +547,34 @@ impl ParquetReaderBuilder { let predicate_key = index_applier.predicate_key(); // Fast path: return early if the result is in the cache. - if self.index_result_cache_get( - predicate_key, - self.file_handle.file_id(), - output, - metrics, - INDEX_TYPE_BLOOM, - ) { - return true; + let cached = self + .cache_strategy + .index_result_cache() + .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id())); + if let Some(result) = cached.as_ref() { + if all_required_row_groups_searched(output, result) { + apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM); + return true; + } } // Slow path: apply the index from the file. let file_size_hint = self.file_handle.meta_ref().index_file_size(); - let rgs = parquet_meta - .row_groups() - .iter() - .enumerate() - .map(|(i, rg)| (rg.num_rows() as usize, output.contains_row_group(i))); + let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| { + ( + rg.num_rows() as usize, + // Optimize: only search the row group that required by `output` and not stored in `cached`. + output.contains_non_empty_row_group(i) + && cached + .as_ref() + .map(|c| !c.contains_row_group(i)) + .unwrap_or(true), + ) + }); let apply_res = index_applier .apply(self.file_handle.file_id(), Some(file_size_hint), rgs) .await; - let selection = match apply_res { + let mut selection = match apply_res { Ok(apply_output) => RowGroupSelection::from_row_ranges(apply_output, row_group_size), Err(err) => { handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM); @@ -562,6 +582,11 @@ impl ParquetReaderBuilder { } }; + // New searched row groups are added to `selection`, concat them with `cached`. + if let Some(cached) = cached.as_ref() { + selection.concat(cached); + } + self.apply_index_result_and_update_cache( predicate_key, self.file_handle.file_id(), @@ -589,27 +614,34 @@ impl ParquetReaderBuilder { let predicate_key = index_applier.predicate_key(); // Fast path: return early if the result is in the cache. - if self.index_result_cache_get( - predicate_key, - self.file_handle.file_id(), - output, - metrics, - INDEX_TYPE_FULLTEXT, - ) { - return true; + let cached = self + .cache_strategy + .index_result_cache() + .and_then(|cache| cache.get(predicate_key, self.file_handle.file_id())); + if let Some(result) = cached.as_ref() { + if all_required_row_groups_searched(output, result) { + apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT); + return true; + } } // Slow path: apply the index from the file. let file_size_hint = self.file_handle.meta_ref().index_file_size(); - let rgs = parquet_meta - .row_groups() - .iter() - .enumerate() - .map(|(i, rg)| (rg.num_rows() as usize, output.contains_row_group(i))); + let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| { + ( + rg.num_rows() as usize, + // Optimize: only search the row group that required by `output` and not stored in `cached`. + output.contains_non_empty_row_group(i) + && cached + .as_ref() + .map(|c| !c.contains_row_group(i)) + .unwrap_or(true), + ) + }); let apply_res = index_applier .apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs) .await; - let selection = match apply_res { + let mut selection = match apply_res { Ok(Some(apply_output)) => { RowGroupSelection::from_row_ranges(apply_output, row_group_size) } @@ -620,6 +652,11 @@ impl ParquetReaderBuilder { } }; + // New searched row groups are added to `selection`, concat them with `cached`. + if let Some(cached) = cached.as_ref() { + selection.concat(cached); + } + self.apply_index_result_and_update_cache( predicate_key, self.file_handle.file_id(), @@ -674,24 +711,6 @@ impl ParquetReaderBuilder { true } - fn index_result_cache_get( - &self, - predicate_key: &PredicateKey, - file_id: FileId, - output: &mut RowGroupSelection, - metrics: &mut ReaderFilterMetrics, - index_type: &str, - ) -> bool { - if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() { - let result = index_result_cache.get(predicate_key, file_id); - if let Some(result) = result { - apply_selection_and_update_metrics(output, &result, metrics, index_type); - return true; - } - } - false - } - fn apply_index_result_and_update_cache( &self, predicate_key: &PredicateKey, @@ -725,6 +744,18 @@ fn apply_selection_and_update_metrics( *output = intersection; } +fn all_required_row_groups_searched( + required_row_groups: &RowGroupSelection, + cached_row_groups: &RowGroupSelection, +) -> bool { + required_row_groups.iter().all(|(rg_id, _)| { + // Row group with no rows is not required to search. + !required_row_groups.contains_non_empty_row_group(*rg_id) + // The row group is already searched. + || cached_row_groups.contains_row_group(*rg_id) + }) +} + /// Metrics of filtering rows groups and rows. #[derive(Debug, Default, Clone, Copy)] pub(crate) struct ReaderFilterMetrics { @@ -1131,7 +1162,10 @@ impl Drop for ParquetReader { impl ParquetReader { /// Creates a new reader. - async fn new(context: FileRangeContextRef, mut selection: RowGroupSelection) -> Result { + pub(crate) async fn new( + context: FileRangeContextRef, + mut selection: RowGroupSelection, + ) -> Result { // No more items in current row group, reads next row group. let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() { let parquet_reader = context diff --git a/src/mito2/src/sst/parquet/row_selection.rs b/src/mito2/src/sst/parquet/row_selection.rs index ce8ba377d3..efe2bae8ef 100644 --- a/src/mito2/src/sst/parquet/row_selection.rs +++ b/src/mito2/src/sst/parquet/row_selection.rs @@ -31,7 +31,7 @@ pub struct RowGroupSelection { } /// A row selection with its count. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] struct RowSelectionWithCount { /// Row selection. selection: RowSelection, @@ -95,6 +95,7 @@ impl RowGroupSelection { /// * The last row group may have fewer rows than `row_group_size` pub fn from_inverted_index_apply_output( row_group_size: usize, + num_row_groups: usize, apply_output: ApplyOutput, ) -> Self { // Step 1: Convert segment IDs to row ranges within row groups @@ -116,7 +117,7 @@ impl RowGroupSelection { // Step 2: Group ranges by row group ID and create row selections let mut total_row_count = 0; let mut total_selector_len = 0; - let selection_in_rg = row_group_ranges + let mut selection_in_rg = row_group_ranges .chunk_by(|(row_group_id, _)| *row_group_id) .into_iter() .map(|(row_group_id, group)| { @@ -141,7 +142,9 @@ impl RowGroupSelection { }, ) }) - .collect(); + .collect::>(); + + Self::fill_missing_row_groups(&mut selection_in_rg, num_row_groups); Self { selection_in_rg, @@ -173,7 +176,7 @@ impl RowGroupSelection { // Step 2: Create row selections for each row group let mut total_row_count = 0; let mut total_selector_len = 0; - let selection_in_rg = row_group_to_row_ids + let mut selection_in_rg = row_group_to_row_ids .into_iter() .map(|(row_group_id, row_ids)| { let selection = @@ -191,7 +194,9 @@ impl RowGroupSelection { }, ) }) - .collect(); + .collect::>(); + + Self::fill_missing_row_groups(&mut selection_in_rg, num_row_groups); Self { selection_in_rg, @@ -324,19 +329,26 @@ impl RowGroupSelection { } /// Returns the first row group in the selection. + /// + /// Skip the row group if the row count is 0. pub fn pop_first(&mut self) -> Option<(usize, RowSelection)> { - let ( + while let Some(( row_group_id, RowSelectionWithCount { selection, row_count, selector_len, }, - ) = self.selection_in_rg.pop_first()?; + )) = self.selection_in_rg.pop_first() + { + if row_count > 0 { + self.row_count -= row_count; + self.selector_len -= selector_len; + return Some((row_group_id, selection)); + } + } - self.row_count -= row_count; - self.selector_len -= selector_len; - Some((row_group_id, selection)) + None } /// Removes a row group from the selection. @@ -363,6 +375,14 @@ impl RowGroupSelection { self.selection_in_rg.contains_key(&row_group_id) } + /// Returns true if the selection contains a row group with the given ID and the row selection is not empty. + pub fn contains_non_empty_row_group(&self, row_group_id: usize) -> bool { + self.selection_in_rg + .get(&row_group_id) + .map(|r| r.row_count > 0) + .unwrap_or(false) + } + /// Returns an iterator over the row groups in the selection. pub fn iter(&self) -> impl Iterator { self.selection_in_rg @@ -375,6 +395,32 @@ impl RowGroupSelection { self.selector_len * size_of::() + self.selection_in_rg.len() * size_of::() } + + /// Concatenates `other` into `self`. `other` must not contain row groups that `self` contains. + /// + /// Panics if `self` contains row groups that `other` contains. + pub fn concat(&mut self, other: &Self) { + for (rg_id, other_rs) in other.selection_in_rg.iter() { + if self.selection_in_rg.contains_key(rg_id) { + panic!("row group {} is already in `self`", rg_id); + } + + self.selection_in_rg.insert(*rg_id, other_rs.clone()); + self.row_count += other_rs.row_count; + self.selector_len += other_rs.selector_len; + } + } + + /// Fills the missing row groups with empty selections. + /// This is to indicate that the row groups are searched even if no rows are found. + fn fill_missing_row_groups( + selection_in_rg: &mut BTreeMap, + num_row_groups: usize, + ) { + for rg_id in 0..num_row_groups { + selection_in_rg.entry(rg_id).or_default(); + } + } } /// Converts an iterator of row ranges into a `RowSelection` by creating a sequence of `RowSelector`s. @@ -678,15 +724,14 @@ mod tests { let selection = RowGroupSelection::from_row_ids(empty_row_ids, row_group_size, num_row_groups); assert_eq!(selection.row_count(), 0); - assert_eq!(selection.row_group_count(), 0); - assert!(selection.get(0).is_none()); + assert_eq!(selection.row_group_count(), 3); // Test with consecutive row IDs let consecutive_row_ids: BTreeSet = vec![5, 6, 7, 8, 9].into_iter().collect(); let selection = RowGroupSelection::from_row_ids(consecutive_row_ids, row_group_size, num_row_groups); assert_eq!(selection.row_count(), 5); - assert_eq!(selection.row_group_count(), 1); + assert_eq!(selection.row_group_count(), 3); let row_selection = selection.get(0).unwrap(); assert_eq!(row_selection.row_count(), 5); // 5, 6, 7, 8, 9 @@ -1047,4 +1092,25 @@ mod tests { let empty_selection = RowGroupSelection::from_row_ranges(vec![], row_group_size); assert!(!empty_selection.contains_row_group(0)); } + + #[test] + fn test_concat() { + let row_group_size = 100; + let ranges1 = vec![ + (0, vec![5..15]), // Within [0, 100) + (1, vec![5..15]), // Within [0, 100) + ]; + + let ranges2 = vec![ + (2, vec![5..15]), // Within [0, 100) + (3, vec![5..15]), // Within [0, 100) + ]; + + let mut selection1 = RowGroupSelection::from_row_ranges(ranges1, row_group_size); + let selection2 = RowGroupSelection::from_row_ranges(ranges2, row_group_size); + + selection1.concat(&selection2); + assert_eq!(selection1.row_count(), 40); + assert_eq!(selection1.row_group_count(), 4); + } } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 0a84b7267b..e8eacf152f 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -202,6 +202,8 @@ pub(crate) enum LogStoreImpl { pub struct TestEnv { /// Path to store data. data_home: TempDir, + intermediate_manager: IntermediateManager, + puffin_manager: PuffinManagerFactory, log_store: Option, log_store_factory: LogStoreFactory, object_store_manager: Option, @@ -209,44 +211,33 @@ pub struct TestEnv { kv_backend: KvBackendRef, } -impl Default for TestEnv { - fn default() -> Self { - TestEnv::new() - } -} - impl TestEnv { /// Returns a new env with empty prefix for test. - pub fn new() -> TestEnv { - let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager(); - TestEnv { - data_home: create_temp_dir(""), - log_store: None, - log_store_factory: LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory), - object_store_manager: None, - schema_metadata_manager, - kv_backend, - } + pub async fn new() -> TestEnv { + Self::with_prefix("").await } /// Returns a new env with specific `prefix` for test. - pub fn with_prefix(prefix: &str) -> TestEnv { - let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager(); - TestEnv { - data_home: create_temp_dir(prefix), - log_store: None, - log_store_factory: LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory), - object_store_manager: None, - schema_metadata_manager, - kv_backend, - } + pub async fn with_prefix(prefix: &str) -> TestEnv { + Self::with_data_home(create_temp_dir(prefix)).await } /// Returns a new env with specific `data_home` for test. - pub fn with_data_home(data_home: TempDir) -> TestEnv { + pub async fn with_data_home(data_home: TempDir) -> TestEnv { let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager(); + + let index_aux_path = data_home.path().join("index_aux"); + let puffin_manager = PuffinManagerFactory::new(&index_aux_path, 4096, None, None) + .await + .unwrap(); + let intermediate_manager = IntermediateManager::init_fs(index_aux_path.to_str().unwrap()) + .await + .unwrap(); + TestEnv { data_home, + intermediate_manager, + puffin_manager, log_store: None, log_store_factory: LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory), object_store_manager: None, @@ -587,17 +578,15 @@ impl TestEnv { local_store: ObjectStore, capacity: ReadableSize, ) -> WriteCacheRef { - let index_aux_path = self.data_home.path().join("index_aux"); - let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None) - .await - .unwrap(); - let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap()) - .await - .unwrap(); - - let write_cache = WriteCache::new(local_store, capacity, None, puffin_mgr, intm_mgr) - .await - .unwrap(); + let write_cache = WriteCache::new( + local_store, + capacity, + None, + self.puffin_manager.clone(), + self.intermediate_manager.clone(), + ) + .await + .unwrap(); Arc::new(write_cache) } @@ -608,17 +597,15 @@ impl TestEnv { path: &str, capacity: ReadableSize, ) -> WriteCacheRef { - let index_aux_path = self.data_home.path().join("index_aux"); - let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None) - .await - .unwrap(); - let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap()) - .await - .unwrap(); - - let write_cache = WriteCache::new_fs(path, capacity, None, puffin_mgr, intm_mgr) - .await - .unwrap(); + let write_cache = WriteCache::new_fs( + path, + capacity, + None, + self.puffin_manager.clone(), + self.intermediate_manager.clone(), + ) + .await + .unwrap(); Arc::new(write_cache) } @@ -634,6 +621,14 @@ impl TestEnv { pub(crate) fn get_log_store(&self) -> Option { self.log_store.as_ref().cloned() } + + pub fn get_puffin_manager(&self) -> PuffinManagerFactory { + self.puffin_manager.clone() + } + + pub fn get_intermediate_manager(&self) -> IntermediateManager { + self.intermediate_manager.clone() + } } /// Builder to mock a [RegionCreateRequest]. diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index e8fc2e7803..c132cd0d44 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -20,7 +20,7 @@ use api::v1::{OpType, SemanticType}; use common_time::Timestamp; use datatypes::arrow::array::{BinaryArray, TimestampMillisecondArray, UInt64Array, UInt8Array}; use datatypes::prelude::ConcreteDataType; -use datatypes::schema::ColumnSchema; +use datatypes::schema::{ColumnSchema, SkippingIndexOptions}; use datatypes::value::ValueRef; use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField}; use parquet::file::metadata::ParquetMetaData; @@ -57,7 +57,12 @@ pub fn sst_region_metadata() -> RegionMetadata { "tag_1".to_string(), ConcreteDataType::string_datatype(), true, - ), + ) + .with_skipping_options(SkippingIndexOptions { + granularity: 1, + ..Default::default() + }) + .unwrap(), semantic_type: SemanticType::Tag, column_id: 1, }) diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index a45dc5277f..6d9a2153dc 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -1142,7 +1142,7 @@ mod tests { #[tokio::test] async fn test_worker_group_start_stop() { - let env = TestEnv::with_prefix("group-stop"); + let env = TestEnv::with_prefix("group-stop").await; let group = env .create_worker_group(MitoConfig { num_workers: 4, diff --git a/src/servers/dashboard/VERSION b/src/servers/dashboard/VERSION index bf057dbfd9..c91125db53 100644 --- a/src/servers/dashboard/VERSION +++ b/src/servers/dashboard/VERSION @@ -1 +1 @@ -v0.10.0 +v0.10.1 diff --git a/src/servers/src/elasticsearch.rs b/src/servers/src/elasticsearch.rs index 050ae5889e..de8244ecec 100644 --- a/src/servers/src/elasticsearch.rs +++ b/src/servers/src/elasticsearch.rs @@ -39,6 +39,7 @@ use crate::http::event::{ extract_pipeline_params_map_from_headers, ingest_logs_inner, LogIngesterQueryParams, LogState, PipelineIngestRequest, }; +use crate::http::header::constants::GREPTIME_PIPELINE_NAME_HEADER_NAME; use crate::metrics::{ METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT, METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED, }; @@ -132,7 +133,7 @@ async fn do_handle_bulk_api( // The `schema` is already set in the query_ctx in auth process. query_ctx.set_channel(Channel::Elasticsearch); - let db = params.db.unwrap_or_else(|| "public".to_string()); + let db = query_ctx.current_schema(); // Record the ingestion time histogram. let _timer = METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED @@ -140,11 +141,12 @@ async fn do_handle_bulk_api( .start_timer(); // If pipeline_name is not provided, use the internal pipeline. - let pipeline_name = if let Some(pipeline) = params.pipeline_name { - pipeline - } else { - GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME.to_string() - }; + let pipeline_name = params.pipeline_name.as_deref().unwrap_or_else(|| { + headers + .get(GREPTIME_PIPELINE_NAME_HEADER_NAME) + .and_then(|v| v.to_str().ok()) + .unwrap_or(GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME) + }); // Read the ndjson payload and convert it to a vector of Value. let requests = match parse_bulk_request(&payload, &index, ¶ms.msg_field) { @@ -164,7 +166,7 @@ async fn do_handle_bulk_api( }; let log_num = requests.len(); - let pipeline = match PipelineDefinition::from_name(&pipeline_name, None, None) { + let pipeline = match PipelineDefinition::from_name(pipeline_name, None, None) { Ok(pipeline) => pipeline, Err(e) => { // should be unreachable diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 38575de1ac..dd7a8804ab 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -117,7 +117,7 @@ const DEFAULT_BODY_LIMIT: ReadableSize = ReadableSize::mb(64); pub const AUTHORIZATION_HEADER: &str = "x-greptime-auth"; // TODO(fys): This is a temporary workaround, it will be improved later -pub static PUBLIC_APIS: [&str; 2] = ["/v1/influxdb/ping", "/v1/influxdb/health"]; +pub static PUBLIC_APIS: [&str; 3] = ["/v1/influxdb/ping", "/v1/influxdb/health", "/v1/health"]; #[derive(Default)] pub struct HttpServer { @@ -306,7 +306,8 @@ pub enum GreptimeQueryOutput { #[derive(Default, Debug, Clone, Copy, PartialEq, Eq)] pub enum ResponseFormat { Arrow, - Csv, + // (with_names, with_types) + Csv(bool, bool), Table, #[default] GreptimedbV1, @@ -318,7 +319,9 @@ impl ResponseFormat { pub fn parse(s: &str) -> Option { match s { "arrow" => Some(ResponseFormat::Arrow), - "csv" => Some(ResponseFormat::Csv), + "csv" => Some(ResponseFormat::Csv(false, false)), + "csvwithnames" => Some(ResponseFormat::Csv(true, false)), + "csvwithnamesandtypes" => Some(ResponseFormat::Csv(true, true)), "table" => Some(ResponseFormat::Table), "greptimedb_v1" => Some(ResponseFormat::GreptimedbV1), "influxdb_v1" => Some(ResponseFormat::InfluxdbV1), @@ -330,7 +333,7 @@ impl ResponseFormat { pub fn as_str(&self) -> &'static str { match self { ResponseFormat::Arrow => "arrow", - ResponseFormat::Csv => "csv", + ResponseFormat::Csv(_, _) => "csv", ResponseFormat::Table => "table", ResponseFormat::GreptimedbV1 => "greptimedb_v1", ResponseFormat::InfluxdbV1 => "influxdb_v1", @@ -720,6 +723,10 @@ impl HttpServer { "/health", routing::get(handler::health).post(handler::health), ) + .route( + &format!("/{HTTP_API_VERSION}/health"), + routing::get(handler::health).post(handler::health), + ) .route( "/ready", routing::get(handler::health).post(handler::health), @@ -1296,6 +1303,16 @@ mod test { "*" ); + let res = client.get("/v1/health").send().await; + + assert_eq!(res.status(), StatusCode::OK); + assert_eq!( + res.headers() + .get(http::header::ACCESS_CONTROL_ALLOW_ORIGIN) + .expect("expect cors header origin"), + "*" + ); + let res = client .options("/health") .header("Access-Control-Request-Headers", "x-greptime-auth") @@ -1480,7 +1497,7 @@ mod test { for format in [ ResponseFormat::GreptimedbV1, ResponseFormat::InfluxdbV1, - ResponseFormat::Csv, + ResponseFormat::Csv(true, true), ResponseFormat::Table, ResponseFormat::Arrow, ResponseFormat::Json, @@ -1490,7 +1507,9 @@ mod test { let outputs = vec![Ok(Output::new_with_record_batches(recordbatches))]; let json_resp = match format { ResponseFormat::Arrow => ArrowResponse::from_output(outputs, None).await, - ResponseFormat::Csv => CsvResponse::from_output(outputs).await, + ResponseFormat::Csv(with_names, with_types) => { + CsvResponse::from_output(outputs, with_names, with_types).await + } ResponseFormat::Table => TableResponse::from_output(outputs).await, ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await, ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, None).await, @@ -1583,4 +1602,46 @@ mod test { } } } + + #[test] + fn test_response_format_misc() { + assert_eq!(ResponseFormat::default(), ResponseFormat::GreptimedbV1); + assert_eq!(ResponseFormat::parse("arrow"), Some(ResponseFormat::Arrow)); + assert_eq!( + ResponseFormat::parse("csv"), + Some(ResponseFormat::Csv(false, false)) + ); + assert_eq!( + ResponseFormat::parse("csvwithnames"), + Some(ResponseFormat::Csv(true, false)) + ); + assert_eq!( + ResponseFormat::parse("csvwithnamesandtypes"), + Some(ResponseFormat::Csv(true, true)) + ); + assert_eq!(ResponseFormat::parse("table"), Some(ResponseFormat::Table)); + assert_eq!( + ResponseFormat::parse("greptimedb_v1"), + Some(ResponseFormat::GreptimedbV1) + ); + assert_eq!( + ResponseFormat::parse("influxdb_v1"), + Some(ResponseFormat::InfluxdbV1) + ); + assert_eq!(ResponseFormat::parse("json"), Some(ResponseFormat::Json)); + + // invalid formats + assert_eq!(ResponseFormat::parse("invalid"), None); + assert_eq!(ResponseFormat::parse(""), None); + assert_eq!(ResponseFormat::parse("CSV"), None); // Case sensitive + + // as str + assert_eq!(ResponseFormat::Arrow.as_str(), "arrow"); + assert_eq!(ResponseFormat::Csv(false, false).as_str(), "csv"); + assert_eq!(ResponseFormat::Csv(true, true).as_str(), "csv"); + assert_eq!(ResponseFormat::Table.as_str(), "table"); + assert_eq!(ResponseFormat::GreptimedbV1.as_str(), "greptimedb_v1"); + assert_eq!(ResponseFormat::InfluxdbV1.as_str(), "influxdb_v1"); + assert_eq!(ResponseFormat::Json.as_str(), "json"); + } } diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index b21af27f10..b6b332d711 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -138,7 +138,9 @@ pub async fn sql( ResponseFormat::Arrow => { ArrowResponse::from_output(outputs, query_params.compression).await } - ResponseFormat::Csv => CsvResponse::from_output(outputs).await, + ResponseFormat::Csv(with_names, with_types) => { + CsvResponse::from_output(outputs, with_names, with_types).await + } ResponseFormat::Table => TableResponse::from_output(outputs).await, ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await, ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await, @@ -327,7 +329,9 @@ pub async fn promql( match format { ResponseFormat::Arrow => ArrowResponse::from_output(outputs, compression).await, - ResponseFormat::Csv => CsvResponse::from_output(outputs).await, + ResponseFormat::Csv(with_names, with_types) => { + CsvResponse::from_output(outputs, with_names, with_types).await + } ResponseFormat::Table => TableResponse::from_output(outputs).await, ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await, ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await, diff --git a/src/servers/src/http/result/csv_result.rs b/src/servers/src/http/result/csv_result.rs index ab662d8176..cf4af29c24 100644 --- a/src/servers/src/http/result/csv_result.rs +++ b/src/servers/src/http/result/csv_result.rs @@ -18,9 +18,9 @@ use common_error::status_code::StatusCode; use common_query::Output; use mime_guess::mime; use serde::{Deserialize, Serialize}; +use serde_json::Value as JsonValue; use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; -// use super::process_with_limit; use crate::http::result::error_result::ErrorResponse; use crate::http::{handler, process_with_limit, GreptimeQueryOutput, HttpResponse, ResponseFormat}; @@ -28,10 +28,16 @@ use crate::http::{handler, process_with_limit, GreptimeQueryOutput, HttpResponse pub struct CsvResponse { output: Vec, execution_time_ms: u64, + with_names: bool, + with_types: bool, } impl CsvResponse { - pub async fn from_output(outputs: Vec>) -> HttpResponse { + pub async fn from_output( + outputs: Vec>, + with_names: bool, + with_types: bool, + ) -> HttpResponse { match handler::from_output(outputs).await { Err(err) => HttpResponse::Error(err), Ok((output, _)) => { @@ -41,10 +47,14 @@ impl CsvResponse { "cannot output multi-statements result in csv format".to_string(), )) } else { - HttpResponse::Csv(CsvResponse { + let csv_resp = CsvResponse { output, execution_time_ms: 0, - }) + with_names: false, + with_types: false, + }; + + HttpResponse::Csv(csv_resp.with_names(with_names).with_types(with_types)) } } } @@ -67,6 +77,21 @@ impl CsvResponse { self.output = process_with_limit(self.output, limit); self } + + pub fn with_names(mut self, with_names: bool) -> Self { + self.with_names = with_names; + self + } + + pub fn with_types(mut self, with_types: bool) -> Self { + self.with_types = with_types; + + // If `with_type` is true, than always set `with_names` to be true. + if with_types { + self.with_names = true; + } + self + } } macro_rules! http_try { @@ -100,11 +125,50 @@ impl IntoResponse for CsvResponse { format!("{n}\n") } Some(GreptimeQueryOutput::Records(records)) => { - let mut wtr = csv::Writer::from_writer(Vec::new()); + let mut wtr = csv::WriterBuilder::new() + .terminator(csv::Terminator::CRLF) // RFC 4180 + .from_writer(Vec::new()); + + if self.with_names { + let names = records + .schema + .column_schemas + .iter() + .map(|c| &c.name) + .collect::>(); + http_try!(wtr.serialize(names)); + } + + if self.with_types { + let types = records + .schema + .column_schemas + .iter() + .map(|c| &c.data_type) + .collect::>(); + http_try!(wtr.serialize(types)); + } for row in records.rows { + let row = row + .into_iter() + .map(|value| { + match value { + // Cast array and object to string + JsonValue::Array(a) => { + JsonValue::String(serde_json::to_string(&a).unwrap_or_default()) + } + JsonValue::Object(o) => { + JsonValue::String(serde_json::to_string(&o).unwrap_or_default()) + } + v => v, + } + }) + .collect::>(); + http_try!(wtr.serialize(row)); } + http_try!(wtr.flush()); let bytes = http_try!(wtr.into_inner()); @@ -122,7 +186,9 @@ impl IntoResponse for CsvResponse { .into_response(); resp.headers_mut().insert( &GREPTIME_DB_HEADER_FORMAT, - HeaderValue::from_static(ResponseFormat::Csv.as_str()), + HeaderValue::from_static( + ResponseFormat::Csv(self.with_names, self.with_types).as_str(), + ), ); resp.headers_mut().insert( &GREPTIME_DB_HEADER_EXECUTION_TIME, @@ -131,3 +197,97 @@ impl IntoResponse for CsvResponse { resp } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_query::Output; + use common_recordbatch::{RecordBatch, RecordBatches}; + use datatypes::prelude::{ConcreteDataType, ScalarVector}; + use datatypes::schema::{ColumnSchema, Schema}; + use datatypes::vectors::{BinaryVector, Float32Vector, StringVector, UInt32Vector, VectorRef}; + + use super::*; + #[tokio::test] + async fn test_csv_response_with_names_and_types() { + let (schema, columns) = create_test_data(); + + let data = r#"1,,-1000.1400146484375,"{""a"":{""b"":2},""b"":2,""c"":3}" +2,hello,1.9900000095367432,"{""a"":4,""b"":{""c"":6},""c"":6}""# + .replace("\n", "\r\n"); + + // Test with_names=true, with_types=true + { + let body = get_csv_body(&schema, &columns, true, true).await; + assert!(body.starts_with("col1,col2,col3,col4\r\nUInt32,String,Float32,Json\r\n")); + assert!(body.contains(&data)); + } + + // Test with_names=true, with_types=false + { + let body = get_csv_body(&schema, &columns, true, false).await; + assert!(body.starts_with("col1,col2,col3,col4\r\n")); + assert!(!body.contains("UInt32,String,Float32,Json")); + assert!(body.contains(&data)); + } + + // Test with_names=false, with_types=false + { + let body = get_csv_body(&schema, &columns, false, false).await; + assert!(!body.starts_with("col1,col2,col3,col4")); + assert!(!body.contains("UInt32,String,Float32,Json")); + assert!(body.contains(&data)); + } + } + + fn create_test_data() -> (Arc, Vec) { + let column_schemas = vec![ + ColumnSchema::new("col1", ConcreteDataType::uint32_datatype(), false), + ColumnSchema::new("col2", ConcreteDataType::string_datatype(), true), + ColumnSchema::new("col3", ConcreteDataType::float32_datatype(), true), + ColumnSchema::new("col4", ConcreteDataType::json_datatype(), true), + ]; + let schema = Arc::new(Schema::new(column_schemas)); + + let json_strings = [ + r#"{"a": {"b": 2}, "b": 2, "c": 3}"#, + r#"{"a": 4, "b": {"c": 6}, "c": 6}"#, + ]; + + let jsonbs = json_strings + .iter() + .map(|s| { + let value = jsonb::parse_value(s.as_bytes()).unwrap(); + value.to_vec() + }) + .collect::>(); + + let columns: Vec = vec![ + Arc::new(UInt32Vector::from_slice(vec![1, 2])), + Arc::new(StringVector::from(vec![None, Some("hello")])), + Arc::new(Float32Vector::from_slice(vec![-1000.14, 1.99])), + Arc::new(BinaryVector::from_vec(jsonbs)), + ]; + + (schema, columns) + } + + async fn get_csv_body( + schema: &Arc, + columns: &[VectorRef], + with_names: bool, + with_types: bool, + ) -> String { + let recordbatch = RecordBatch::new(schema.clone(), columns.to_vec()).unwrap(); + let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch]).unwrap(); + let output = Output::new_with_record_batches(recordbatches); + let outputs = vec![Ok(output)]; + + let resp = CsvResponse::from_output(outputs, with_names, with_types) + .await + .into_response(); + let bytes = axum::body::to_bytes(resp.into_body(), 1024).await.unwrap(); + String::from_utf8(bytes.to_vec()).unwrap() + } +} diff --git a/src/servers/tests/http/http_handler_test.rs b/src/servers/tests/http/http_handler_test.rs index 85bd575da3..f76d27fd54 100644 --- a/src/servers/tests/http/http_handler_test.rs +++ b/src/servers/tests/http/http_handler_test.rs @@ -142,7 +142,7 @@ async fn test_sql_output_rows() { axum::body::to_bytes(resp.into_body(), usize::MAX) .await .unwrap(), - Bytes::from_static(b"4950\n"), + Bytes::from_static(b"4950\r\n"), ); } HttpResponse::Table(resp) => { @@ -289,7 +289,7 @@ async fn test_sql_form() { axum::body::to_bytes(resp.into_body(), usize::MAX) .await .unwrap(), - Bytes::from_static(b"4950\n"), + Bytes::from_static(b"4950\r\n"), ); } HttpResponse::Table(resp) => { diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index db1df56004..77df110d06 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -453,7 +453,28 @@ pub async fn test_sql_api(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); let body = &res.text().await; // Must be escaped correctly: 66.6,0,"host, ""name" - assert_eq!(body, "66.6,0,\"host, \"\"name\"\n"); + assert_eq!(body, "66.6,0,\"host, \"\"name\"\r\n"); + + // csv with names + let res = client + .get("/v1/sql?format=csvWithNames&sql=select cpu,ts,host from demo limit 1") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body = &res.text().await; + assert_eq!(body, "cpu,ts,host\r\n66.6,0,\"host, \"\"name\"\r\n"); + + // csv with names and types + let res = client + .get("/v1/sql?format=csvWithNamesAndTypes&sql=select cpu,ts,host from demo limit 1") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body = &res.text().await; + assert_eq!( + body, + "cpu,ts,host\r\nFloat64,TimestampMillisecond,String\r\n66.6,0,\"host, \"\"name\"\r\n" + ); // test parse method let res = client.get("/v1/sql/parse?sql=desc table t").send().await; @@ -523,7 +544,7 @@ pub async fn test_prometheus_promql_api(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); let csv_body = &res.text().await; - assert_eq!("0,1.0\n5000,1.0\n10000,1.0\n15000,1.0\n20000,1.0\n25000,1.0\n30000,1.0\n35000,1.0\n40000,1.0\n45000,1.0\n50000,1.0\n55000,1.0\n60000,1.0\n65000,1.0\n70000,1.0\n75000,1.0\n80000,1.0\n85000,1.0\n90000,1.0\n95000,1.0\n100000,1.0\n", csv_body); + assert_eq!("0,1.0\r\n5000,1.0\r\n10000,1.0\r\n15000,1.0\r\n20000,1.0\r\n25000,1.0\r\n30000,1.0\r\n35000,1.0\r\n40000,1.0\r\n45000,1.0\r\n50000,1.0\r\n55000,1.0\r\n60000,1.0\r\n65000,1.0\r\n70000,1.0\r\n75000,1.0\r\n80000,1.0\r\n85000,1.0\r\n90000,1.0\r\n95000,1.0\r\n100000,1.0\r\n", csv_body); guard.remove_all().await; }