feat: cherry-pick #6384 #6388 #6396 #6403 #6412 #6405 to 0.15 branch (#6414)

* feat: supports CsvWithNames and CsvWithNamesAndTypes formats (#6384)

* feat: supports CsvWithNames and CsvWithNamesAndTypes formats and object/array types

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* test: added and fixed tests

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: fix test

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: remove comments

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* test: add json type csv tests

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>

* chore: remove comment

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: introduce /v1/health for healthcheck from external (#6388)

Signed-off-by: Ning Sun <sunning@greptime.com>
Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: update dashboard to v0.10.1 (#6396)

Co-authored-by: ZonaHex <ZonaHex@users.noreply.github.com>
Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: complete partial index search results in cache (#6403)

* fix: complete partial index search results in cache

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* polish

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* add initial tests

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* cover issue case

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* TestEnv new -> async

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: skip failing nodes when gathering porcess info (#6412)

* fix/process-manager-skip-fail-nodes:
 - **Enhance Error Handling in `process_manager.rs`:**
   Improved error handling by adding a warning log for failing nodes in the `list_process` method. This ensures that the process listing continues even if some nodes fail to respond.

 - **Add Error Type Import in `process_manager.rs`:**
   Included the `Error` type from the `error` module to handle errors more effectively within the `ProcessManager` implementation.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: clippy

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix/process-manager-skip-fail-nodes:
 **Enhancements to Debugging and Trait Implementation**

 - **`process_manager.rs`**: Improved logging by adding more detailed error messages when skipping failing nodes.
 - **`selector.rs`**: Enhanced the `FrontendClient` trait by adding the `Debug` trait bound to improve debugging capabilities.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: pass pipeline name through http header and get db from query context (#6405)

Signed-off-by: zyy17 <zyylsxm@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
Signed-off-by: Ning Sun <sunning@greptime.com>
Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
Signed-off-by: zyy17 <zyylsxm@gmail.com>
Co-authored-by: dennis zhuang <killme2008@gmail.com>
Co-authored-by: Ning Sun <sunng@protonmail.com>
Co-authored-by: ZonaHe <zonahe@qq.com>
Co-authored-by: ZonaHex <ZonaHex@users.noreply.github.com>
Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>
Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
Co-authored-by: zyy17 <zyylsxm@gmail.com>
This commit is contained in:
Yingwen
2025-06-27 20:11:28 +08:00
committed by GitHub
parent 7cd6be41ce
commit f712c1b356
49 changed files with 1006 additions and 319 deletions

View File

@@ -21,7 +21,7 @@ use std::sync::{Arc, RwLock};
use api::v1::frontend::{KillProcessRequest, ListProcessRequest, ProcessInfo};
use common_base::cancellation::CancellationHandle;
use common_frontend::selector::{FrontendSelector, MetaClientSelector};
use common_telemetry::{debug, info};
use common_telemetry::{debug, info, warn};
use common_time::util::current_time_millis;
use meta_client::MetaClientRef;
use snafu::{ensure, OptionExt, ResultExt};
@@ -141,14 +141,20 @@ impl ProcessManager {
.await
.context(error::InvokeFrontendSnafu)?;
for mut f in frontends {
processes.extend(
f.list_process(ListProcessRequest {
let result = f
.list_process(ListProcessRequest {
catalog: catalog.unwrap_or_default().to_string(),
})
.await
.context(error::InvokeFrontendSnafu)?
.processes,
);
.context(error::InvokeFrontendSnafu);
match result {
Ok(resp) => {
processes.extend(resp.processes);
}
Err(e) => {
warn!(e; "Skipping failing node: {:?}", f)
}
}
}
}
processes.extend(self.local_processes(catalog)?);

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Debug;
use std::time::Duration;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
@@ -30,7 +31,7 @@ use crate::error::{MetaSnafu, Result};
pub type FrontendClientPtr = Box<dyn FrontendClient>;
#[async_trait::async_trait]
pub trait FrontendClient: Send {
pub trait FrontendClient: Send + Debug {
async fn list_process(&mut self, req: ListProcessRequest) -> Result<ListProcessResponse>;
async fn kill_process(&mut self, req: KillProcessRequest) -> Result<KillProcessResponse>;

View File

@@ -475,7 +475,7 @@ mod test {
async fn region_alive_keeper() {
common_telemetry::init_default_ut_logging();
let mut region_server = mock_region_server();
let mut engine_env = TestEnv::with_prefix("region-alive-keeper");
let mut engine_env = TestEnv::with_prefix("region-alive-keeper").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
let engine = Arc::new(engine);
region_server.register_engine(engine.clone());

View File

@@ -278,7 +278,7 @@ mod tests {
let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let mut engine_env = TestEnv::with_prefix("close-region");
let mut engine_env = TestEnv::with_prefix("close-region").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine));
let region_id = RegionId::new(1024, 1);
@@ -326,7 +326,7 @@ mod tests {
let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let mut engine_env = TestEnv::with_prefix("open-region");
let mut engine_env = TestEnv::with_prefix("open-region").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine));
let region_id = RegionId::new(1024, 1);
@@ -374,7 +374,7 @@ mod tests {
let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let mut engine_env = TestEnv::with_prefix("open-not-exists-region");
let mut engine_env = TestEnv::with_prefix("open-not-exists-region").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine));
let region_id = RegionId::new(1024, 1);
@@ -406,7 +406,7 @@ mod tests {
let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let mut engine_env = TestEnv::with_prefix("downgrade-region");
let mut engine_env = TestEnv::with_prefix("downgrade-region").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine));
let region_id = RegionId::new(1024, 1);

View File

@@ -59,7 +59,7 @@ impl TestEnv {
/// Returns a new env with specific `prefix` and `config` for test.
pub async fn with_prefix_and_config(prefix: &str, config: EngineConfig) -> Self {
let mut mito_env = MitoTestEnv::with_prefix(prefix);
let mut mito_env = MitoTestEnv::with_prefix(prefix).await;
let mito = mito_env.create_engine(MitoConfig::default()).await;
let metric = MetricEngine::try_new(mito.clone(), config).unwrap();
Self {

View File

@@ -245,7 +245,7 @@ mod test {
let blob = create_inverted_index_blob().await;
// Init a test range reader in local fs.
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let file_size = blob.len() as u64;
let store = env.init_object_store_manager();
let temp_path = "data";

View File

@@ -31,6 +31,11 @@ use crate::sst::parquet::row_selection::RowGroupSelection;
const INDEX_RESULT_TYPE: &str = "index_result";
/// Cache for storing index query results.
///
/// The `RowGroupSelection` is a collection of row groups that match the predicate.
///
/// Row groups can be partially searched. Row groups that not contained in `RowGroupSelection` are not searched.
/// User can retrieve the partial results and handle uncontained row groups required by the predicate subsequently.
pub struct IndexResultCache {
cache: Cache<(PredicateKey, FileId), Arc<RowGroupSelection>>,
}
@@ -64,6 +69,8 @@ impl IndexResultCache {
}
/// Puts a query result into the cache.
///
/// Allow user to put a partial result (not containing all row groups) into the cache.
pub fn put(&self, key: PredicateKey, file_id: FileId, result: Arc<RowGroupSelection>) {
let key = (key, file_id);
let size = Self::index_result_cache_weight(&key, &result);
@@ -74,6 +81,9 @@ impl IndexResultCache {
}
/// Gets a query result from the cache.
///
/// Note: the returned `RowGroupSelection` only contains the row groups that are searched.
/// Caller should handle the uncontained row groups required by the predicate subsequently.
pub fn get(&self, key: &PredicateKey, file_id: FileId) -> Option<Arc<RowGroupSelection>> {
let res = self.cache.get(&(key.clone(), file_id));
if res.is_some() {

View File

@@ -449,7 +449,7 @@ mod tests {
async fn test_write_and_upload_sst() {
// TODO(QuenKar): maybe find a way to create some object server for testing,
// and now just use local file system to mock.
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let mock_store = env.init_object_store_manager();
let path_provider = RegionFilePathFactory::new("test".to_string());
@@ -537,7 +537,7 @@ mod tests {
#[tokio::test]
async fn test_read_metadata_from_write_cache() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let data_home = env.data_home().display().to_string();
let mock_store = env.init_object_store_manager();
@@ -606,7 +606,7 @@ mod tests {
#[tokio::test]
async fn test_write_cache_clean_tmp_files() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let data_home = env.data_home().display().to_string();
let mock_store = env.init_object_store_manager();

View File

@@ -115,7 +115,7 @@ fn check_region_version(
async fn test_alter_region() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -211,7 +211,7 @@ fn build_rows_for_tags(
#[tokio::test]
async fn test_put_after_alter() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
@@ -316,7 +316,7 @@ async fn test_put_after_alter() {
async fn test_alter_region_retry() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -374,7 +374,7 @@ async fn test_alter_region_retry() {
async fn test_alter_on_flushing() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let listener = Arc::new(AlterFlushListener::default());
let engine = env
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()))
@@ -478,7 +478,7 @@ async fn test_alter_on_flushing() {
async fn test_alter_column_fulltext_options() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let listener = Arc::new(AlterFlushListener::default());
let engine = env
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()))
@@ -597,7 +597,7 @@ async fn test_alter_column_fulltext_options() {
async fn test_alter_column_set_inverted_index() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let listener = Arc::new(AlterFlushListener::default());
let engine = env
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()))
@@ -707,7 +707,7 @@ async fn test_alter_column_set_inverted_index() {
async fn test_alter_region_ttl_options() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let listener = Arc::new(AlterFlushListener::default());
let engine = env
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()))
@@ -757,7 +757,7 @@ async fn test_alter_region_ttl_options() {
async fn test_write_stall_on_altering() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let listener = Arc::new(NotifyRegionChangeResultListener::default());
let engine = env
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()))

View File

@@ -31,7 +31,7 @@ use crate::test_util::{
async fn test_append_mode_write_query() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -89,7 +89,7 @@ async fn test_append_mode_write_query() {
#[tokio::test]
async fn test_append_mode_compaction() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
..Default::default()

View File

@@ -42,7 +42,7 @@ use crate::test_util::{
#[tokio::test]
async fn test_engine_new_stop() {
let mut env = TestEnv::with_prefix("engine-stop");
let mut env = TestEnv::with_prefix("engine-stop").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -69,7 +69,7 @@ async fn test_engine_new_stop() {
#[tokio::test]
async fn test_write_to_region() {
let mut env = TestEnv::with_prefix("write-to-region");
let mut env = TestEnv::with_prefix("write-to-region").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -97,7 +97,9 @@ async fn test_region_replay(factory: Option<LogStoreFactory>) {
let Some(factory) = factory else {
return;
};
let mut env = TestEnv::with_prefix("region-replay").with_log_store_factory(factory.clone());
let mut env = TestEnv::with_prefix("region-replay")
.await
.with_log_store_factory(factory.clone());
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -173,7 +175,7 @@ async fn test_region_replay(factory: Option<LogStoreFactory>) {
#[tokio::test]
async fn test_write_query_region() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -207,7 +209,7 @@ async fn test_write_query_region() {
#[tokio::test]
async fn test_different_order() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -268,7 +270,7 @@ async fn test_different_order() {
#[tokio::test]
async fn test_different_order_and_type() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -332,7 +334,7 @@ async fn test_different_order_and_type() {
async fn test_put_delete() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -384,7 +386,7 @@ async fn test_put_delete() {
#[tokio::test]
async fn test_delete_not_null_fields() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -433,7 +435,7 @@ async fn test_delete_not_null_fields() {
#[tokio::test]
async fn test_put_overwrite() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -493,7 +495,7 @@ async fn test_put_overwrite() {
#[tokio::test]
async fn test_absent_and_invalid_columns() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -541,7 +543,7 @@ async fn test_absent_and_invalid_columns() {
#[tokio::test]
async fn test_region_usage() {
let mut env = TestEnv::with_prefix("region_usage");
let mut env = TestEnv::with_prefix("region_usage").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -595,7 +597,7 @@ async fn test_region_usage() {
async fn test_engine_with_write_cache() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let path = env.data_home().to_str().unwrap().to_string();
let mito_config = MitoConfig::default().enable_write_cache(path, ReadableSize::mb(512), None);
let engine = env.create_engine(mito_config).await;
@@ -635,7 +637,7 @@ async fn test_engine_with_write_cache() {
#[tokio::test]
async fn test_cache_null_primary_key() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
vector_cache_size: ReadableSize::mb(32),

View File

@@ -39,8 +39,9 @@ async fn test_batch_open(factory: Option<LogStoreFactory>) {
let Some(factory) = factory else {
return;
};
let mut env =
TestEnv::with_prefix("open-batch-regions").with_log_store_factory(factory.clone());
let mut env = TestEnv::with_prefix("open-batch-regions")
.await
.with_log_store_factory(factory.clone());
let engine = env.create_engine(MitoConfig::default()).await;
let topic = prepare_test_for_kafka_log_store(&factory).await;
@@ -160,8 +161,9 @@ async fn test_batch_open_err(factory: Option<LogStoreFactory>) {
let Some(factory) = factory else {
return;
};
let mut env =
TestEnv::with_prefix("open-batch-regions-err").with_log_store_factory(factory.clone());
let mut env = TestEnv::with_prefix("open-batch-regions-err")
.await
.with_log_store_factory(factory.clone());
let engine = env.create_engine(MitoConfig::default()).await;
let topic = prepare_test_for_kafka_log_store(&factory).await;
let mut options = HashMap::new();

View File

@@ -57,7 +57,9 @@ async fn test_catchup_with_last_entry_id(factory: Option<LogStoreFactory>) {
return;
};
let mut env = TestEnv::with_prefix("last_entry_id").with_log_store_factory(factory.clone());
let mut env = TestEnv::with_prefix("last_entry_id")
.await
.with_log_store_factory(factory.clone());
let topic = prepare_test_for_kafka_log_store(&factory).await;
let leader_engine = env.create_engine(MitoConfig::default()).await;
let follower_engine = env.create_follower_engine(MitoConfig::default()).await;
@@ -175,8 +177,9 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option<LogStoreFacto
return;
};
let mut env =
TestEnv::with_prefix("incorrect_last_entry_id").with_log_store_factory(factory.clone());
let mut env = TestEnv::with_prefix("incorrect_last_entry_id")
.await
.with_log_store_factory(factory.clone());
let topic = prepare_test_for_kafka_log_store(&factory).await;
let leader_engine = env.create_engine(MitoConfig::default()).await;
let follower_engine = env.create_follower_engine(MitoConfig::default()).await;
@@ -277,8 +280,9 @@ async fn test_catchup_without_last_entry_id(factory: Option<LogStoreFactory>) {
return;
};
let mut env =
TestEnv::with_prefix("without_last_entry_id").with_log_store_factory(factory.clone());
let mut env = TestEnv::with_prefix("without_last_entry_id")
.await
.with_log_store_factory(factory.clone());
let topic = prepare_test_for_kafka_log_store(&factory).await;
let leader_engine = env.create_engine(MitoConfig::default()).await;
let follower_engine = env.create_follower_engine(MitoConfig::default()).await;
@@ -380,8 +384,9 @@ async fn test_catchup_with_manifest_update(factory: Option<LogStoreFactory>) {
return;
};
let mut env =
TestEnv::with_prefix("without_manifest_update").with_log_store_factory(factory.clone());
let mut env = TestEnv::with_prefix("without_manifest_update")
.await
.with_log_store_factory(factory.clone());
let topic = prepare_test_for_kafka_log_store(&factory).await;
let leader_engine = env.create_engine(MitoConfig::default()).await;
let follower_engine = env.create_follower_engine(MitoConfig::default()).await;
@@ -545,7 +550,9 @@ async fn test_local_catchup(factory: Option<LogStoreFactory>) {
return;
};
let mut env = TestEnv::with_prefix("local_catchup").with_log_store_factory(factory.clone());
let mut env = TestEnv::with_prefix("local_catchup")
.await
.with_log_store_factory(factory.clone());
let leader_engine = env.create_engine(MitoConfig::default()).await;
let Some(LogStoreImpl::RaftEngine(log_store)) = env.get_log_store() else {
unreachable!()
@@ -686,7 +693,7 @@ async fn test_local_catchup(factory: Option<LogStoreFactory>) {
#[tokio::test]
async fn test_catchup_not_exist() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let non_exist_region_id = RegionId::new(1, 1);

View File

@@ -21,7 +21,7 @@ use crate::test_util::{CreateRequestBuilder, TestEnv};
#[tokio::test]
async fn test_engine_close_region() {
let mut env = TestEnv::with_prefix("close");
let mut env = TestEnv::with_prefix("close").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -136,7 +136,7 @@ async fn collect_stream_ts(stream: SendableRecordBatchStream) -> Vec<i64> {
#[tokio::test]
async fn test_compaction_region() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -202,7 +202,7 @@ async fn test_compaction_region() {
#[tokio::test]
async fn test_infer_compaction_time_window() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -341,7 +341,7 @@ async fn test_infer_compaction_time_window() {
#[tokio::test]
async fn test_compaction_overlapping_files() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -402,7 +402,7 @@ async fn test_compaction_overlapping_files() {
#[tokio::test]
async fn test_compaction_region_with_overlapping() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -450,7 +450,7 @@ async fn test_compaction_region_with_overlapping() {
#[tokio::test]
async fn test_compaction_region_with_overlapping_delete_all() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -506,7 +506,7 @@ async fn test_compaction_region_with_overlapping_delete_all() {
#[tokio::test]
async fn test_readonly_during_compaction() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let listener = Arc::new(CompactionListener::default());
let engine = env
.create_engine_with(
@@ -590,7 +590,7 @@ async fn test_readonly_during_compaction() {
#[tokio::test]
async fn test_compaction_update_time_window() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -686,7 +686,7 @@ async fn test_compaction_update_time_window() {
#[tokio::test]
async fn test_change_region_compaction_window() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -811,7 +811,7 @@ async fn test_change_region_compaction_window() {
#[tokio::test]
async fn test_open_overwrite_compaction_window() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -26,7 +26,7 @@ use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder,
#[tokio::test]
async fn test_engine_create_new_region() {
let mut env = TestEnv::with_prefix("new-region");
let mut env = TestEnv::with_prefix("new-region").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -41,7 +41,7 @@ async fn test_engine_create_new_region() {
#[tokio::test]
async fn test_engine_create_existing_region() {
let mut env = TestEnv::with_prefix("create-existing");
let mut env = TestEnv::with_prefix("create-existing").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -61,7 +61,7 @@ async fn test_engine_create_existing_region() {
#[tokio::test]
async fn test_engine_create_close_create_region() {
// This test will trigger create_or_open function.
let mut env = TestEnv::with_prefix("create-close-create");
let mut env = TestEnv::with_prefix("create-close-create").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -91,7 +91,7 @@ async fn test_engine_create_close_create_region() {
#[tokio::test]
async fn test_engine_create_with_different_id() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -110,7 +110,7 @@ async fn test_engine_create_with_different_id() {
#[tokio::test]
async fn test_engine_create_with_different_schema() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -130,7 +130,7 @@ async fn test_engine_create_with_different_schema() {
#[tokio::test]
async fn test_engine_create_with_different_primary_key() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -150,7 +150,7 @@ async fn test_engine_create_with_different_primary_key() {
#[tokio::test]
async fn test_engine_create_with_options() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -172,7 +172,7 @@ async fn test_engine_create_with_options() {
#[tokio::test]
async fn test_engine_create_with_custom_store() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env
.create_engine_with_multiple_object_stores(MitoConfig::default(), None, None, &["Gcs"])
.await;
@@ -204,7 +204,7 @@ async fn test_engine_create_with_custom_store() {
#[tokio::test]
async fn test_engine_create_with_memtable_opts() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -35,7 +35,7 @@ use crate::worker::DROPPING_MARKER_FILE;
async fn test_engine_drop_region() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::with_prefix("drop");
let mut env = TestEnv::with_prefix("drop").await;
let listener = Arc::new(DropListener::new(Duration::from_millis(100)));
let engine = env
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()))
@@ -143,7 +143,7 @@ async fn test_engine_drop_region_for_custom_store() {
put_rows(engine, region_id, rows).await;
flush_region(engine, region_id, None).await;
}
let mut env = TestEnv::with_prefix("drop");
let mut env = TestEnv::with_prefix("drop").await;
let listener = Arc::new(DropListener::new(Duration::from_millis(100)));
let engine = env
.create_engine_with_multiple_object_stores(

View File

@@ -33,7 +33,7 @@ use crate::test_util::{CreateRequestBuilder, TestEnv};
#[tokio::test]
async fn test_edit_region_schedule_compaction() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
struct EditRegionListener {
tx: Mutex<Option<oneshot::Sender<RegionId>>>,
@@ -122,7 +122,7 @@ async fn test_edit_region_schedule_compaction() {
#[tokio::test]
async fn test_edit_region_fill_cache() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
struct EditRegionListener {
tx: Mutex<Option<oneshot::Sender<FileId>>>,
@@ -241,7 +241,7 @@ async fn test_edit_region_concurrently() {
}
}
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
// Suppress the compaction to not impede the speed of this kinda stress testing.

View File

@@ -28,7 +28,7 @@ use crate::test_util::{
async fn test_scan_without_filtering_deleted() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -41,7 +41,7 @@ use crate::worker::MAX_INITIAL_CHECK_DELAY_SECS;
#[tokio::test]
async fn test_manual_flush() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -91,7 +91,7 @@ async fn test_manual_flush() {
#[tokio::test]
async fn test_flush_engine() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(FlushListener::default());
let engine = env
@@ -161,7 +161,7 @@ async fn test_flush_engine() {
#[tokio::test]
async fn test_write_stall() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(StallListener::default());
let engine = env
@@ -236,7 +236,7 @@ async fn test_write_stall() {
#[tokio::test]
async fn test_flush_empty() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let engine = env
.create_engine_with(
@@ -289,7 +289,7 @@ async fn test_flush_reopen_region(factory: Option<LogStoreFactory>) {
return;
};
let mut env = TestEnv::new().with_log_store_factory(factory.clone());
let mut env = TestEnv::new().await.with_log_store_factory(factory.clone());
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
@@ -396,7 +396,7 @@ impl MockTimeProvider {
#[tokio::test]
async fn test_auto_flush_engine() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(FlushListener::default());
let now = current_time_millis();
@@ -467,7 +467,7 @@ async fn test_auto_flush_engine() {
#[tokio::test]
async fn test_flush_workers() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(FlushListener::default());
let engine = env
@@ -554,7 +554,7 @@ async fn test_update_topic_latest_entry_id(factory: Option<LogStoreFactory>) {
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(FlushListener::default());
let mut env = TestEnv::new().with_log_store_factory(factory.clone());
let mut env = TestEnv::new().await.with_log_store_factory(factory.clone());
let engine = env
.create_engine_with(
MitoConfig::default(),

View File

@@ -31,7 +31,7 @@ use crate::test_util::{
async fn test_merge_mode_write_query() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -89,7 +89,7 @@ async fn test_merge_mode_write_query() {
async fn test_merge_mode_compaction() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
..Default::default()

View File

@@ -36,7 +36,7 @@ use crate::test_util::{
#[tokio::test]
async fn test_engine_open_empty() {
let mut env = TestEnv::with_prefix("open-empty");
let mut env = TestEnv::with_prefix("open-empty").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -63,7 +63,7 @@ async fn test_engine_open_empty() {
#[tokio::test]
async fn test_engine_open_existing() {
let mut env = TestEnv::with_prefix("open-exiting");
let mut env = TestEnv::with_prefix("open-exiting").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -90,7 +90,7 @@ async fn test_engine_open_existing() {
#[tokio::test]
async fn test_engine_reopen_region() {
let mut env = TestEnv::with_prefix("reopen-region");
let mut env = TestEnv::with_prefix("reopen-region").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -107,7 +107,7 @@ async fn test_engine_reopen_region() {
#[tokio::test]
async fn test_engine_open_readonly() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -150,7 +150,7 @@ async fn test_engine_open_readonly() {
#[tokio::test]
async fn test_engine_region_open_with_options() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -190,7 +190,7 @@ async fn test_engine_region_open_with_options() {
#[tokio::test]
async fn test_engine_region_open_with_custom_store() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env
.create_engine_with_multiple_object_stores(MitoConfig::default(), None, None, &["Gcs"])
.await;
@@ -244,7 +244,7 @@ async fn test_engine_region_open_with_custom_store() {
#[tokio::test]
async fn test_open_region_skip_wal_replay() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -344,7 +344,7 @@ async fn test_open_region_skip_wal_replay() {
#[tokio::test]
async fn test_open_region_wait_for_opening_region_ok() {
let mut env = TestEnv::with_prefix("wait-for-opening-region-ok");
let mut env = TestEnv::with_prefix("wait-for-opening-region-ok").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let worker = engine.inner.workers.worker(region_id);
@@ -383,7 +383,7 @@ async fn test_open_region_wait_for_opening_region_ok() {
#[tokio::test]
async fn test_open_region_wait_for_opening_region_err() {
let mut env = TestEnv::with_prefix("wait-for-opening-region-err");
let mut env = TestEnv::with_prefix("wait-for-opening-region-err").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let worker = engine.inner.workers.worker(region_id);
@@ -428,7 +428,7 @@ async fn test_open_region_wait_for_opening_region_err() {
#[tokio::test]
async fn test_open_compaction_region() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let mut mito_config = MitoConfig::default();
mito_config
.sanitize(&env.data_home().display().to_string())

View File

@@ -73,7 +73,7 @@ async fn scan_in_parallel(
#[tokio::test]
async fn test_parallel_scan() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -53,7 +53,7 @@ fn build_rows_multi_tags_fields(
#[tokio::test]
async fn test_scan_projection() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -27,7 +27,7 @@ use crate::test_util::{
};
async fn check_prune_row_groups(exprs: Vec<Expr>, expected: &str) {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -147,7 +147,7 @@ fn time_range_expr(start_sec: i64, end_sec: i64) -> Expr {
#[tokio::test]
async fn test_prune_memtable() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -221,7 +221,7 @@ async fn test_prune_memtable() {
#[tokio::test]
async fn test_prune_memtable_complex_expr() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -274,7 +274,7 @@ async fn test_prune_memtable_complex_expr() {
#[tokio::test]
async fn test_mem_range_prune() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -25,7 +25,7 @@ use crate::test_util::{
};
async fn test_last_row(append_mode: bool) {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -27,7 +27,7 @@ use crate::test_util::{CreateRequestBuilder, TestEnv};
#[tokio::test]
async fn test_scan_with_min_sst_sequence() {
let mut env = TestEnv::with_prefix("test_scan_with_min_sst_sequence");
let mut env = TestEnv::with_prefix("test_scan_with_min_sst_sequence").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -153,7 +153,7 @@ async fn test_scan_with_min_sst_sequence() {
#[tokio::test]
async fn test_series_scan() {
let mut env = TestEnv::with_prefix("test_series_scan");
let mut env = TestEnv::with_prefix("test_series_scan").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -32,7 +32,7 @@ async fn test_set_role_state_gracefully() {
SettableRegionRoleState::DowngradingLeader,
];
for settable_role_state in settable_role_states {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -101,7 +101,7 @@ async fn test_set_role_state_gracefully() {
#[tokio::test]
async fn test_set_role_state_gracefully_not_exist() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let non_exist_region_id = RegionId::new(1, 1);
@@ -116,7 +116,7 @@ async fn test_set_role_state_gracefully_not_exist() {
#[tokio::test]
async fn test_write_downgrading_region() {
let mut env = TestEnv::with_prefix("write-to-downgrading-region");
let mut env = TestEnv::with_prefix("write-to-downgrading-region").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -71,7 +71,7 @@ async fn scan_check(
#[tokio::test]
async fn test_sync_after_flush_region() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
@@ -163,7 +163,7 @@ async fn test_sync_after_flush_region() {
async fn test_sync_after_alter_region() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -33,7 +33,7 @@ use crate::test_util::{
#[tokio::test]
async fn test_engine_truncate_region_basic() {
let mut env = TestEnv::with_prefix("truncate-basic");
let mut env = TestEnv::with_prefix("truncate-basic").await;
let engine = env.create_engine(MitoConfig::default()).await;
// Create the region.
@@ -83,7 +83,7 @@ async fn test_engine_truncate_region_basic() {
#[tokio::test]
async fn test_engine_put_data_after_truncate() {
let mut env = TestEnv::with_prefix("truncate-put");
let mut env = TestEnv::with_prefix("truncate-put").await;
let engine = env.create_engine(MitoConfig::default()).await;
// Create the region.
@@ -146,7 +146,7 @@ async fn test_engine_put_data_after_truncate() {
#[tokio::test]
async fn test_engine_truncate_after_flush() {
let mut env = TestEnv::with_prefix("truncate-flush");
let mut env = TestEnv::with_prefix("truncate-flush").await;
let engine = env.create_engine(MitoConfig::default()).await;
// Create the region.
@@ -223,7 +223,7 @@ async fn test_engine_truncate_after_flush() {
#[tokio::test]
async fn test_engine_truncate_reopen() {
let mut env = TestEnv::with_prefix("truncate-reopen");
let mut env = TestEnv::with_prefix("truncate-reopen").await;
let engine = env.create_engine(MitoConfig::default()).await;
// Create the region.
@@ -282,7 +282,7 @@ async fn test_engine_truncate_reopen() {
#[tokio::test]
async fn test_engine_truncate_during_flush() {
init_default_ut_logging();
let mut env = TestEnv::with_prefix("truncate-during-flush");
let mut env = TestEnv::with_prefix("truncate-during-flush").await;
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(FlushTruncateListener::default());
let engine = env

View File

@@ -594,7 +594,7 @@ mod test {
#[tokio::test]
async fn create_manifest_manager() {
let metadata = Arc::new(basic_region_metadata());
let env = TestEnv::new();
let env = TestEnv::new().await;
let manager = env
.create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
.await
@@ -606,7 +606,7 @@ mod test {
#[tokio::test]
async fn open_manifest_manager() {
let env = TestEnv::new();
let env = TestEnv::new().await;
// Try to opens an empty manifest.
assert!(env
.create_manifest_manager(CompressionType::Uncompressed, 10, None)
@@ -637,7 +637,7 @@ mod test {
#[tokio::test]
async fn region_change_add_column() {
let metadata = Arc::new(basic_region_metadata());
let env = TestEnv::new();
let env = TestEnv::new().await;
let mut manager = env
.create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
.await
@@ -696,7 +696,7 @@ mod test {
let metadata = Arc::new(basic_region_metadata());
let data_home = create_temp_dir("");
let data_home_path = data_home.path().to_str().unwrap().to_string();
let env = TestEnv::with_data_home(data_home);
let env = TestEnv::with_data_home(data_home).await;
let manifest_dir = format!("{}/manifest", data_home_path);

View File

@@ -34,7 +34,7 @@ async fn build_manager(
compress_type: CompressionType,
) -> (TestEnv, RegionManifestManager) {
let metadata = Arc::new(basic_region_metadata());
let env = TestEnv::new();
let env = TestEnv::new().await;
let manager = env
.create_manifest_manager(compress_type, checkpoint_distance, Some(metadata.clone()))
.await

View File

@@ -128,6 +128,9 @@ impl BloomFilterIndexApplier {
/// list of row group ranges that match the predicates.
///
/// The `row_groups` iterator provides the row group lengths and whether to search in the row group.
///
/// Row group id existing in the returned result means that the row group is searched.
/// Empty ranges means that the row group is searched but no rows are found.
pub async fn apply(
&self,
file_id: FileId,
@@ -195,7 +198,6 @@ impl BloomFilterIndexApplier {
range.end -= start;
}
}
output.retain(|(_, ranges)| !ranges.is_empty());
Ok(output)
}
@@ -386,6 +388,9 @@ mod tests {
.apply(file_id, None, row_groups.into_iter())
.await
.unwrap()
.into_iter()
.filter(|(_, ranges)| !ranges.is_empty())
.collect()
})
}
}

View File

@@ -231,6 +231,9 @@ impl FulltextIndexApplier {
impl FulltextIndexApplier {
/// Applies coarse-grained fulltext index to the specified SST file.
/// Returns (row group id -> ranges) that match the queries.
///
/// Row group id existing in the returned result means that the row group is searched.
/// Empty ranges means that the row group is searched but no rows are found.
pub async fn apply_coarse(
&self,
file_id: FileId,
@@ -367,7 +370,7 @@ impl FulltextIndexApplier {
/// Adjusts the coarse output. Makes the output ranges based on row group start.
fn adjust_coarse_output(
input: Vec<(usize, Range<usize>)>,
output: &mut Vec<(usize, Vec<Range<usize>>)>,
output: &mut [(usize, Vec<Range<usize>>)],
) {
// adjust ranges to be based on row group
for ((_, output), (_, input)) in output.iter_mut().zip(input) {
@@ -377,7 +380,6 @@ impl FulltextIndexApplier {
range.end -= start;
}
}
output.retain(|(_, ranges)| !ranges.is_empty());
}
/// Converts terms to predicates.

View File

@@ -625,6 +625,7 @@ mod tests {
.unwrap();
resp.map(|r| {
r.into_iter()
.filter(|(_, ranges)| !ranges.is_empty())
.map(|(row_group_id, _)| row_group_id as RowId)
.collect()
})

View File

@@ -14,10 +14,8 @@
use common_telemetry::{debug, warn};
use puffin::puffin_manager::{PuffinManager, PuffinWriter};
use store_api::storage::ColumnId;
use crate::sst::index::bloom_filter::creator::BloomFilterIndexer;
use crate::sst::index::fulltext_index::creator::FulltextIndexer;
use crate::sst::index::inverted_index::creator::InvertedIndexer;
use crate::sst::index::puffin_manager::SstPuffinWriter;
use crate::sst::index::statistics::{ByteCount, RowCount};
use crate::sst::index::{
@@ -113,13 +111,14 @@ impl Indexer {
return true;
};
let column_ids = indexer.column_ids().collect();
let err = match indexer.finish(puffin_writer).await {
Ok((row_count, byte_count)) => {
self.fill_inverted_index_output(
&mut index_output.inverted_index,
row_count,
byte_count,
&indexer,
column_ids,
);
return true;
}
@@ -150,13 +149,14 @@ impl Indexer {
return true;
};
let column_ids = indexer.column_ids().collect();
let err = match indexer.finish(puffin_writer).await {
Ok((row_count, byte_count)) => {
self.fill_fulltext_index_output(
&mut index_output.fulltext_index,
row_count,
byte_count,
&indexer,
column_ids,
);
return true;
}
@@ -187,13 +187,14 @@ impl Indexer {
return true;
};
let column_ids = indexer.column_ids().collect();
let err = match indexer.finish(puffin_writer).await {
Ok((row_count, byte_count)) => {
self.fill_bloom_filter_output(
&mut index_output.bloom_filter,
row_count,
byte_count,
&indexer,
column_ids,
);
return true;
}
@@ -220,16 +221,16 @@ impl Indexer {
output: &mut InvertedIndexOutput,
row_count: RowCount,
byte_count: ByteCount,
indexer: &InvertedIndexer,
column_ids: Vec<ColumnId>,
) {
debug!(
"Inverted index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}",
self.region_id, self.file_id, byte_count, row_count
"Inverted index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
self.region_id, self.file_id, byte_count, row_count, column_ids
);
output.index_size = byte_count;
output.row_count = row_count;
output.columns = indexer.column_ids().collect();
output.columns = column_ids;
}
fn fill_fulltext_index_output(
@@ -237,16 +238,16 @@ impl Indexer {
output: &mut FulltextIndexOutput,
row_count: RowCount,
byte_count: ByteCount,
indexer: &FulltextIndexer,
column_ids: Vec<ColumnId>,
) {
debug!(
"Full-text index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}",
self.region_id, self.file_id, byte_count, row_count
"Full-text index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
self.region_id, self.file_id, byte_count, row_count, column_ids
);
output.index_size = byte_count;
output.row_count = row_count;
output.columns = indexer.column_ids().collect();
output.columns = column_ids;
}
fn fill_bloom_filter_output(
@@ -254,15 +255,15 @@ impl Indexer {
output: &mut BloomFilterOutput,
row_count: RowCount,
byte_count: ByteCount,
indexer: &BloomFilterIndexer,
column_ids: Vec<ColumnId>,
) {
debug!(
"Bloom filter created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}",
self.region_id, self.file_id, byte_count, row_count
"Bloom filter created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
self.region_id, self.file_id, byte_count, row_count, column_ids
);
output.index_size = byte_count;
output.row_count = row_count;
output.columns = indexer.column_ids().collect();
output.columns = column_ids;
}
}

View File

@@ -88,11 +88,12 @@ pub struct SstInfo {
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::Arc;
use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::{BinaryExpr, Expr, Operator};
use datafusion_expr::{col, lit, BinaryExpr, Expr, Operator};
use datatypes::arrow;
use datatypes::arrow::array::RecordBatch;
use datatypes::arrow::datatypes::{DataType, Field, Schema};
@@ -104,12 +105,17 @@ mod tests {
use tokio_util::compat::FuturesAsyncWriteCompatExt;
use super::*;
use crate::access_layer::{FilePathProvider, RegionFilePathFactory};
use crate::access_layer::{FilePathProvider, OperationType, RegionFilePathFactory};
use crate::cache::{CacheManager, CacheStrategy, PageKey};
use crate::read::BatchReader;
use crate::sst::index::{Indexer, IndexerBuilder};
use crate::region::options::{IndexOptions, InvertedIndexOptions};
use crate::sst::file::{FileHandle, FileMeta};
use crate::sst::file_purger::NoopFilePurger;
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
use crate::sst::index::{Indexer, IndexerBuilder, IndexerBuilderImpl};
use crate::sst::parquet::format::WriteFormat;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY};
use crate::test_util::sst_util::{
@@ -147,7 +153,7 @@ mod tests {
#[tokio::test]
async fn test_write_read() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = FixedPathProvider {
@@ -205,7 +211,7 @@ mod tests {
#[tokio::test]
async fn test_read_with_cache() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let metadata = Arc::new(sst_region_metadata());
@@ -275,7 +281,7 @@ mod tests {
#[tokio::test]
async fn test_parquet_metadata_eq() {
// create test env
let mut env = crate::test_util::TestEnv::new();
let mut env = crate::test_util::TestEnv::new().await;
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let metadata = Arc::new(sst_region_metadata());
@@ -318,7 +324,7 @@ mod tests {
#[tokio::test]
async fn test_read_with_tag_filter() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let metadata = Arc::new(sst_region_metadata());
@@ -370,7 +376,7 @@ mod tests {
#[tokio::test]
async fn test_read_empty_batch() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let metadata = Arc::new(sst_region_metadata());
@@ -407,7 +413,7 @@ mod tests {
#[tokio::test]
async fn test_read_with_field_filter() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let metadata = Arc::new(sst_region_metadata());
@@ -453,7 +459,7 @@ mod tests {
#[tokio::test]
async fn test_read_large_binary() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = handle.file_path(FILE_DIR);
@@ -544,7 +550,7 @@ mod tests {
async fn test_write_multiple_files() {
common_telemetry::init_default_ut_logging();
// create test env
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let object_store = env.init_object_store_manager();
let metadata = Arc::new(sst_region_metadata());
let batches = &[
@@ -593,4 +599,300 @@ mod tests {
}
assert_eq!(total_rows, rows_read);
}
#[tokio::test]
async fn test_write_read_with_index() {
let mut env = TestEnv::new().await;
let object_store = env.init_object_store_manager();
let file_path = RegionFilePathFactory::new(FILE_DIR.to_string());
let metadata = Arc::new(sst_region_metadata());
let row_group_size = 50;
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 20),
new_batch_by_range(&["b", "d"], 0, 20),
new_batch_by_range(&["c", "d"], 0, 20),
new_batch_by_range(&["c", "f"], 0, 40),
new_batch_by_range(&["c", "h"], 100, 200),
]);
// Use a small row group size for test.
let write_opts = WriteOptions {
row_group_size,
..Default::default()
};
let puffin_manager = env
.get_puffin_manager()
.build(object_store.clone(), file_path.clone());
let intermediate_manager = env.get_intermediate_manager();
let indexer_builder = IndexerBuilderImpl {
op_type: OperationType::Flush,
metadata: metadata.clone(),
row_group_size,
puffin_manager,
intermediate_manager,
index_options: IndexOptions {
inverted_index: InvertedIndexOptions {
segment_row_count: 1,
..Default::default()
},
},
inverted_index_config: Default::default(),
fulltext_index_config: Default::default(),
bloom_filter_index_config: Default::default(),
};
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
indexer_builder,
file_path.clone(),
)
.await;
let info = writer
.write_all(source, None, &write_opts)
.await
.unwrap()
.remove(0);
assert_eq!(200, info.num_rows);
assert!(info.file_size > 0);
assert!(info.index_metadata.file_size > 0);
assert!(info.index_metadata.inverted_index.index_size > 0);
assert_eq!(info.index_metadata.inverted_index.row_count, 200);
assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
assert!(info.index_metadata.bloom_filter.index_size > 0);
assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
assert_eq!(
(
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(199)
),
info.time_range
);
let handle = FileHandle::new(
FileMeta {
region_id: metadata.region_id,
file_id: info.file_id,
time_range: info.time_range,
level: 0,
file_size: info.file_size,
available_indexes: info.index_metadata.build_available_indexes(),
index_file_size: info.index_metadata.file_size,
num_row_groups: info.num_row_groups,
num_rows: info.num_rows as u64,
sequence: None,
},
Arc::new(NoopFilePurger),
);
let cache = Arc::new(
CacheManager::builder()
.index_result_cache_size(1024 * 1024)
.index_metadata_size(1024 * 1024)
.index_content_page_size(1024 * 1024)
.index_content_size(1024 * 1024)
.puffin_metadata_size(1024 * 1024)
.build(),
);
let index_result_cache = cache.index_result_cache().unwrap();
let build_inverted_index_applier = |exprs: &[Expr]| {
InvertedIndexApplierBuilder::new(
FILE_DIR.to_string(),
object_store.clone(),
&metadata,
HashSet::from_iter([0]),
env.get_puffin_manager(),
)
.with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
.with_inverted_index_cache(cache.inverted_index_cache().cloned())
.build(exprs)
.unwrap()
.map(Arc::new)
};
let build_bloom_filter_applier = |exprs: &[Expr]| {
BloomFilterIndexApplierBuilder::new(
FILE_DIR.to_string(),
object_store.clone(),
&metadata,
env.get_puffin_manager(),
)
.with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
.with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
.build(exprs)
.unwrap()
.map(Arc::new)
};
// Data: ts tag_0 tag_1
// Data: 0-20 [a, d]
// 0-20 [b, d]
// 0-20 [c, d]
// 0-40 [c, f]
// 100-200 [c, h]
//
// Pred: tag_0 = "b"
//
// Row groups & rows pruning:
//
// Row Groups:
// - min-max: filter out row groups 1..=3
//
// Rows:
// - inverted index: hit row group 0, hit 20 rows
let preds = vec![col("tag_0").eq(lit("b"))];
let inverted_index_applier = build_inverted_index_applier(&preds);
let bloom_filter_applier = build_bloom_filter_applier(&preds);
let builder =
ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store.clone())
.predicate(Some(Predicate::new(preds)))
.inverted_index_applier(inverted_index_applier.clone())
.bloom_filter_index_applier(bloom_filter_applier.clone())
.cache(CacheStrategy::EnableAll(cache.clone()));
let mut metrics = ReaderMetrics::default();
let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
let mut reader = ParquetReader::new(Arc::new(context), selection)
.await
.unwrap();
check_reader_result(&mut reader, &[new_batch_by_range(&["b", "d"], 0, 20)]).await;
assert_eq!(metrics.filter_metrics.rg_total, 4);
assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
let cached = index_result_cache
.get(
inverted_index_applier.unwrap().predicate_key(),
handle.file_id(),
)
.unwrap();
// inverted index will search all row groups
assert!(cached.contains_row_group(0));
assert!(cached.contains_row_group(1));
assert!(cached.contains_row_group(2));
assert!(cached.contains_row_group(3));
// Data: ts tag_0 tag_1
// Data: 0-20 [a, d]
// 0-20 [b, d]
// 0-20 [c, d]
// 0-40 [c, f]
// 100-200 [c, h]
//
// Pred: 50 <= ts && ts < 200 && tag_1 = "d"
//
// Row groups & rows pruning:
//
// Row Groups:
// - min-max: filter out row groups 0..=1
// - bloom filter: filter out row groups 2..=3
let preds = vec![
col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
col("tag_1").eq(lit("d")),
];
let inverted_index_applier = build_inverted_index_applier(&preds);
let bloom_filter_applier = build_bloom_filter_applier(&preds);
let builder =
ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store.clone())
.predicate(Some(Predicate::new(preds)))
.inverted_index_applier(inverted_index_applier.clone())
.bloom_filter_index_applier(bloom_filter_applier.clone())
.cache(CacheStrategy::EnableAll(cache.clone()));
let mut metrics = ReaderMetrics::default();
let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
let mut reader = ParquetReader::new(Arc::new(context), selection)
.await
.unwrap();
check_reader_result(&mut reader, &[]).await;
assert_eq!(metrics.filter_metrics.rg_total, 4);
assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
let cached = index_result_cache
.get(
bloom_filter_applier.unwrap().predicate_key(),
handle.file_id(),
)
.unwrap();
assert!(cached.contains_row_group(2));
assert!(cached.contains_row_group(3));
assert!(!cached.contains_row_group(0));
assert!(!cached.contains_row_group(1));
// Remove the pred of `ts`, continue to use the pred of `tag_1`
// to test if cache works.
// Data: ts tag_0 tag_1
// Data: 0-20 [a, d]
// 0-20 [b, d]
// 0-20 [c, d]
// 0-40 [c, f]
// 100-200 [c, h]
//
// Pred: tag_1 = "d"
//
// Row groups & rows pruning:
//
// Row Groups:
// - bloom filter: filter out row groups 2..=3
//
// Rows:
// - bloom filter: hit row group 0, hit 50 rows
// hit row group 1, hit 10 rows
let preds = vec![col("tag_1").eq(lit("d"))];
let inverted_index_applier = build_inverted_index_applier(&preds);
let bloom_filter_applier = build_bloom_filter_applier(&preds);
let builder =
ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store.clone())
.predicate(Some(Predicate::new(preds)))
.inverted_index_applier(inverted_index_applier.clone())
.bloom_filter_index_applier(bloom_filter_applier.clone())
.cache(CacheStrategy::EnableAll(cache.clone()));
let mut metrics = ReaderMetrics::default();
let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
let mut reader = ParquetReader::new(Arc::new(context), selection)
.await
.unwrap();
check_reader_result(
&mut reader,
&[
new_batch_by_range(&["a", "d"], 0, 20),
new_batch_by_range(&["b", "d"], 0, 20),
new_batch_by_range(&["c", "d"], 0, 10),
new_batch_by_range(&["c", "d"], 10, 20),
],
)
.await;
assert_eq!(metrics.filter_metrics.rg_total, 4);
assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
let cached = index_result_cache
.get(
bloom_filter_applier.unwrap().predicate_key(),
handle.file_id(),
)
.unwrap();
assert!(cached.contains_row_group(0));
assert!(cached.contains_row_group(1));
assert!(cached.contains_row_group(2));
assert!(cached.contains_row_group(3));
}
}

View File

@@ -378,14 +378,24 @@ impl ParquetReaderBuilder {
}
let fulltext_filtered = self
.prune_row_groups_by_fulltext_index(row_group_size, parquet_meta, &mut output, metrics)
.prune_row_groups_by_fulltext_index(
row_group_size,
num_row_groups,
&mut output,
metrics,
)
.await;
if output.is_empty() {
return output;
}
self.prune_row_groups_by_inverted_index(row_group_size, &mut output, metrics)
.await;
self.prune_row_groups_by_inverted_index(
row_group_size,
num_row_groups,
&mut output,
metrics,
)
.await;
if output.is_empty() {
return output;
}
@@ -412,7 +422,7 @@ impl ParquetReaderBuilder {
async fn prune_row_groups_by_fulltext_index(
&self,
row_group_size: usize,
parquet_meta: &ParquetMetaData,
num_row_groups: usize,
output: &mut RowGroupSelection,
metrics: &mut ReaderFilterMetrics,
) -> bool {
@@ -425,14 +435,15 @@ impl ParquetReaderBuilder {
let predicate_key = index_applier.predicate_key();
// Fast path: return early if the result is in the cache.
if self.index_result_cache_get(
predicate_key,
self.file_handle.file_id(),
output,
metrics,
INDEX_TYPE_FULLTEXT,
) {
return true;
let cached = self
.cache_strategy
.index_result_cache()
.and_then(|cache| cache.get(predicate_key, self.file_handle.file_id()));
if let Some(result) = cached.as_ref() {
if all_required_row_groups_searched(output, result) {
apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
return true;
}
}
// Slow path: apply the index from the file.
@@ -441,9 +452,7 @@ impl ParquetReaderBuilder {
.apply_fine(self.file_handle.file_id(), Some(file_size_hint))
.await;
let selection = match apply_res {
Ok(Some(res)) => {
RowGroupSelection::from_row_ids(res, row_group_size, parquet_meta.num_row_groups())
}
Ok(Some(res)) => RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups),
Ok(None) => return false,
Err(err) => {
handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
@@ -470,6 +479,7 @@ impl ParquetReaderBuilder {
async fn prune_row_groups_by_inverted_index(
&self,
row_group_size: usize,
num_row_groups: usize,
output: &mut RowGroupSelection,
metrics: &mut ReaderFilterMetrics,
) -> bool {
@@ -482,14 +492,15 @@ impl ParquetReaderBuilder {
let predicate_key = index_applier.predicate_key();
// Fast path: return early if the result is in the cache.
if self.index_result_cache_get(
predicate_key,
self.file_handle.file_id(),
output,
metrics,
INDEX_TYPE_INVERTED,
) {
return true;
let cached = self
.cache_strategy
.index_result_cache()
.and_then(|cache| cache.get(predicate_key, self.file_handle.file_id()));
if let Some(result) = cached.as_ref() {
if all_required_row_groups_searched(output, result) {
apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
return true;
}
}
// Slow path: apply the index from the file.
@@ -498,9 +509,11 @@ impl ParquetReaderBuilder {
.apply(self.file_handle.file_id(), Some(file_size_hint))
.await;
let selection = match apply_res {
Ok(output) => {
RowGroupSelection::from_inverted_index_apply_output(row_group_size, output)
}
Ok(output) => RowGroupSelection::from_inverted_index_apply_output(
row_group_size,
num_row_groups,
output,
),
Err(err) => {
handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
return false;
@@ -534,27 +547,34 @@ impl ParquetReaderBuilder {
let predicate_key = index_applier.predicate_key();
// Fast path: return early if the result is in the cache.
if self.index_result_cache_get(
predicate_key,
self.file_handle.file_id(),
output,
metrics,
INDEX_TYPE_BLOOM,
) {
return true;
let cached = self
.cache_strategy
.index_result_cache()
.and_then(|cache| cache.get(predicate_key, self.file_handle.file_id()));
if let Some(result) = cached.as_ref() {
if all_required_row_groups_searched(output, result) {
apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
return true;
}
}
// Slow path: apply the index from the file.
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let rgs = parquet_meta
.row_groups()
.iter()
.enumerate()
.map(|(i, rg)| (rg.num_rows() as usize, output.contains_row_group(i)));
let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
(
rg.num_rows() as usize,
// Optimize: only search the row group that required by `output` and not stored in `cached`.
output.contains_non_empty_row_group(i)
&& cached
.as_ref()
.map(|c| !c.contains_row_group(i))
.unwrap_or(true),
)
});
let apply_res = index_applier
.apply(self.file_handle.file_id(), Some(file_size_hint), rgs)
.await;
let selection = match apply_res {
let mut selection = match apply_res {
Ok(apply_output) => RowGroupSelection::from_row_ranges(apply_output, row_group_size),
Err(err) => {
handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
@@ -562,6 +582,11 @@ impl ParquetReaderBuilder {
}
};
// New searched row groups are added to `selection`, concat them with `cached`.
if let Some(cached) = cached.as_ref() {
selection.concat(cached);
}
self.apply_index_result_and_update_cache(
predicate_key,
self.file_handle.file_id(),
@@ -589,27 +614,34 @@ impl ParquetReaderBuilder {
let predicate_key = index_applier.predicate_key();
// Fast path: return early if the result is in the cache.
if self.index_result_cache_get(
predicate_key,
self.file_handle.file_id(),
output,
metrics,
INDEX_TYPE_FULLTEXT,
) {
return true;
let cached = self
.cache_strategy
.index_result_cache()
.and_then(|cache| cache.get(predicate_key, self.file_handle.file_id()));
if let Some(result) = cached.as_ref() {
if all_required_row_groups_searched(output, result) {
apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
return true;
}
}
// Slow path: apply the index from the file.
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let rgs = parquet_meta
.row_groups()
.iter()
.enumerate()
.map(|(i, rg)| (rg.num_rows() as usize, output.contains_row_group(i)));
let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
(
rg.num_rows() as usize,
// Optimize: only search the row group that required by `output` and not stored in `cached`.
output.contains_non_empty_row_group(i)
&& cached
.as_ref()
.map(|c| !c.contains_row_group(i))
.unwrap_or(true),
)
});
let apply_res = index_applier
.apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs)
.await;
let selection = match apply_res {
let mut selection = match apply_res {
Ok(Some(apply_output)) => {
RowGroupSelection::from_row_ranges(apply_output, row_group_size)
}
@@ -620,6 +652,11 @@ impl ParquetReaderBuilder {
}
};
// New searched row groups are added to `selection`, concat them with `cached`.
if let Some(cached) = cached.as_ref() {
selection.concat(cached);
}
self.apply_index_result_and_update_cache(
predicate_key,
self.file_handle.file_id(),
@@ -674,24 +711,6 @@ impl ParquetReaderBuilder {
true
}
fn index_result_cache_get(
&self,
predicate_key: &PredicateKey,
file_id: FileId,
output: &mut RowGroupSelection,
metrics: &mut ReaderFilterMetrics,
index_type: &str,
) -> bool {
if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
let result = index_result_cache.get(predicate_key, file_id);
if let Some(result) = result {
apply_selection_and_update_metrics(output, &result, metrics, index_type);
return true;
}
}
false
}
fn apply_index_result_and_update_cache(
&self,
predicate_key: &PredicateKey,
@@ -725,6 +744,18 @@ fn apply_selection_and_update_metrics(
*output = intersection;
}
fn all_required_row_groups_searched(
required_row_groups: &RowGroupSelection,
cached_row_groups: &RowGroupSelection,
) -> bool {
required_row_groups.iter().all(|(rg_id, _)| {
// Row group with no rows is not required to search.
!required_row_groups.contains_non_empty_row_group(*rg_id)
// The row group is already searched.
|| cached_row_groups.contains_row_group(*rg_id)
})
}
/// Metrics of filtering rows groups and rows.
#[derive(Debug, Default, Clone, Copy)]
pub(crate) struct ReaderFilterMetrics {
@@ -1131,7 +1162,10 @@ impl Drop for ParquetReader {
impl ParquetReader {
/// Creates a new reader.
async fn new(context: FileRangeContextRef, mut selection: RowGroupSelection) -> Result<Self> {
pub(crate) async fn new(
context: FileRangeContextRef,
mut selection: RowGroupSelection,
) -> Result<Self> {
// No more items in current row group, reads next row group.
let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
let parquet_reader = context

View File

@@ -31,7 +31,7 @@ pub struct RowGroupSelection {
}
/// A row selection with its count.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
struct RowSelectionWithCount {
/// Row selection.
selection: RowSelection,
@@ -95,6 +95,7 @@ impl RowGroupSelection {
/// * The last row group may have fewer rows than `row_group_size`
pub fn from_inverted_index_apply_output(
row_group_size: usize,
num_row_groups: usize,
apply_output: ApplyOutput,
) -> Self {
// Step 1: Convert segment IDs to row ranges within row groups
@@ -116,7 +117,7 @@ impl RowGroupSelection {
// Step 2: Group ranges by row group ID and create row selections
let mut total_row_count = 0;
let mut total_selector_len = 0;
let selection_in_rg = row_group_ranges
let mut selection_in_rg = row_group_ranges
.chunk_by(|(row_group_id, _)| *row_group_id)
.into_iter()
.map(|(row_group_id, group)| {
@@ -141,7 +142,9 @@ impl RowGroupSelection {
},
)
})
.collect();
.collect::<BTreeMap<_, _>>();
Self::fill_missing_row_groups(&mut selection_in_rg, num_row_groups);
Self {
selection_in_rg,
@@ -173,7 +176,7 @@ impl RowGroupSelection {
// Step 2: Create row selections for each row group
let mut total_row_count = 0;
let mut total_selector_len = 0;
let selection_in_rg = row_group_to_row_ids
let mut selection_in_rg = row_group_to_row_ids
.into_iter()
.map(|(row_group_id, row_ids)| {
let selection =
@@ -191,7 +194,9 @@ impl RowGroupSelection {
},
)
})
.collect();
.collect::<BTreeMap<_, _>>();
Self::fill_missing_row_groups(&mut selection_in_rg, num_row_groups);
Self {
selection_in_rg,
@@ -324,19 +329,26 @@ impl RowGroupSelection {
}
/// Returns the first row group in the selection.
///
/// Skip the row group if the row count is 0.
pub fn pop_first(&mut self) -> Option<(usize, RowSelection)> {
let (
while let Some((
row_group_id,
RowSelectionWithCount {
selection,
row_count,
selector_len,
},
) = self.selection_in_rg.pop_first()?;
)) = self.selection_in_rg.pop_first()
{
if row_count > 0 {
self.row_count -= row_count;
self.selector_len -= selector_len;
return Some((row_group_id, selection));
}
}
self.row_count -= row_count;
self.selector_len -= selector_len;
Some((row_group_id, selection))
None
}
/// Removes a row group from the selection.
@@ -363,6 +375,14 @@ impl RowGroupSelection {
self.selection_in_rg.contains_key(&row_group_id)
}
/// Returns true if the selection contains a row group with the given ID and the row selection is not empty.
pub fn contains_non_empty_row_group(&self, row_group_id: usize) -> bool {
self.selection_in_rg
.get(&row_group_id)
.map(|r| r.row_count > 0)
.unwrap_or(false)
}
/// Returns an iterator over the row groups in the selection.
pub fn iter(&self) -> impl Iterator<Item = (&usize, &RowSelection)> {
self.selection_in_rg
@@ -375,6 +395,32 @@ impl RowGroupSelection {
self.selector_len * size_of::<RowSelector>()
+ self.selection_in_rg.len() * size_of::<RowSelectionWithCount>()
}
/// Concatenates `other` into `self`. `other` must not contain row groups that `self` contains.
///
/// Panics if `self` contains row groups that `other` contains.
pub fn concat(&mut self, other: &Self) {
for (rg_id, other_rs) in other.selection_in_rg.iter() {
if self.selection_in_rg.contains_key(rg_id) {
panic!("row group {} is already in `self`", rg_id);
}
self.selection_in_rg.insert(*rg_id, other_rs.clone());
self.row_count += other_rs.row_count;
self.selector_len += other_rs.selector_len;
}
}
/// Fills the missing row groups with empty selections.
/// This is to indicate that the row groups are searched even if no rows are found.
fn fill_missing_row_groups(
selection_in_rg: &mut BTreeMap<usize, RowSelectionWithCount>,
num_row_groups: usize,
) {
for rg_id in 0..num_row_groups {
selection_in_rg.entry(rg_id).or_default();
}
}
}
/// Converts an iterator of row ranges into a `RowSelection` by creating a sequence of `RowSelector`s.
@@ -678,15 +724,14 @@ mod tests {
let selection =
RowGroupSelection::from_row_ids(empty_row_ids, row_group_size, num_row_groups);
assert_eq!(selection.row_count(), 0);
assert_eq!(selection.row_group_count(), 0);
assert!(selection.get(0).is_none());
assert_eq!(selection.row_group_count(), 3);
// Test with consecutive row IDs
let consecutive_row_ids: BTreeSet<u32> = vec![5, 6, 7, 8, 9].into_iter().collect();
let selection =
RowGroupSelection::from_row_ids(consecutive_row_ids, row_group_size, num_row_groups);
assert_eq!(selection.row_count(), 5);
assert_eq!(selection.row_group_count(), 1);
assert_eq!(selection.row_group_count(), 3);
let row_selection = selection.get(0).unwrap();
assert_eq!(row_selection.row_count(), 5); // 5, 6, 7, 8, 9
@@ -1047,4 +1092,25 @@ mod tests {
let empty_selection = RowGroupSelection::from_row_ranges(vec![], row_group_size);
assert!(!empty_selection.contains_row_group(0));
}
#[test]
fn test_concat() {
let row_group_size = 100;
let ranges1 = vec![
(0, vec![5..15]), // Within [0, 100)
(1, vec![5..15]), // Within [0, 100)
];
let ranges2 = vec![
(2, vec![5..15]), // Within [0, 100)
(3, vec![5..15]), // Within [0, 100)
];
let mut selection1 = RowGroupSelection::from_row_ranges(ranges1, row_group_size);
let selection2 = RowGroupSelection::from_row_ranges(ranges2, row_group_size);
selection1.concat(&selection2);
assert_eq!(selection1.row_count(), 40);
assert_eq!(selection1.row_group_count(), 4);
}
}

View File

@@ -202,6 +202,8 @@ pub(crate) enum LogStoreImpl {
pub struct TestEnv {
/// Path to store data.
data_home: TempDir,
intermediate_manager: IntermediateManager,
puffin_manager: PuffinManagerFactory,
log_store: Option<LogStoreImpl>,
log_store_factory: LogStoreFactory,
object_store_manager: Option<ObjectStoreManagerRef>,
@@ -209,44 +211,33 @@ pub struct TestEnv {
kv_backend: KvBackendRef,
}
impl Default for TestEnv {
fn default() -> Self {
TestEnv::new()
}
}
impl TestEnv {
/// Returns a new env with empty prefix for test.
pub fn new() -> TestEnv {
let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
TestEnv {
data_home: create_temp_dir(""),
log_store: None,
log_store_factory: LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory),
object_store_manager: None,
schema_metadata_manager,
kv_backend,
}
pub async fn new() -> TestEnv {
Self::with_prefix("").await
}
/// Returns a new env with specific `prefix` for test.
pub fn with_prefix(prefix: &str) -> TestEnv {
let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
TestEnv {
data_home: create_temp_dir(prefix),
log_store: None,
log_store_factory: LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory),
object_store_manager: None,
schema_metadata_manager,
kv_backend,
}
pub async fn with_prefix(prefix: &str) -> TestEnv {
Self::with_data_home(create_temp_dir(prefix)).await
}
/// Returns a new env with specific `data_home` for test.
pub fn with_data_home(data_home: TempDir) -> TestEnv {
pub async fn with_data_home(data_home: TempDir) -> TestEnv {
let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
let index_aux_path = data_home.path().join("index_aux");
let puffin_manager = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
.await
.unwrap();
let intermediate_manager = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
.await
.unwrap();
TestEnv {
data_home,
intermediate_manager,
puffin_manager,
log_store: None,
log_store_factory: LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory),
object_store_manager: None,
@@ -587,17 +578,15 @@ impl TestEnv {
local_store: ObjectStore,
capacity: ReadableSize,
) -> WriteCacheRef {
let index_aux_path = self.data_home.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
.await
.unwrap();
let write_cache = WriteCache::new(local_store, capacity, None, puffin_mgr, intm_mgr)
.await
.unwrap();
let write_cache = WriteCache::new(
local_store,
capacity,
None,
self.puffin_manager.clone(),
self.intermediate_manager.clone(),
)
.await
.unwrap();
Arc::new(write_cache)
}
@@ -608,17 +597,15 @@ impl TestEnv {
path: &str,
capacity: ReadableSize,
) -> WriteCacheRef {
let index_aux_path = self.data_home.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
.await
.unwrap();
let write_cache = WriteCache::new_fs(path, capacity, None, puffin_mgr, intm_mgr)
.await
.unwrap();
let write_cache = WriteCache::new_fs(
path,
capacity,
None,
self.puffin_manager.clone(),
self.intermediate_manager.clone(),
)
.await
.unwrap();
Arc::new(write_cache)
}
@@ -634,6 +621,14 @@ impl TestEnv {
pub(crate) fn get_log_store(&self) -> Option<LogStoreImpl> {
self.log_store.as_ref().cloned()
}
pub fn get_puffin_manager(&self) -> PuffinManagerFactory {
self.puffin_manager.clone()
}
pub fn get_intermediate_manager(&self) -> IntermediateManager {
self.intermediate_manager.clone()
}
}
/// Builder to mock a [RegionCreateRequest].

View File

@@ -20,7 +20,7 @@ use api::v1::{OpType, SemanticType};
use common_time::Timestamp;
use datatypes::arrow::array::{BinaryArray, TimestampMillisecondArray, UInt64Array, UInt8Array};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::schema::{ColumnSchema, SkippingIndexOptions};
use datatypes::value::ValueRef;
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
use parquet::file::metadata::ParquetMetaData;
@@ -57,7 +57,12 @@ pub fn sst_region_metadata() -> RegionMetadata {
"tag_1".to_string(),
ConcreteDataType::string_datatype(),
true,
),
)
.with_skipping_options(SkippingIndexOptions {
granularity: 1,
..Default::default()
})
.unwrap(),
semantic_type: SemanticType::Tag,
column_id: 1,
})

View File

@@ -1142,7 +1142,7 @@ mod tests {
#[tokio::test]
async fn test_worker_group_start_stop() {
let env = TestEnv::with_prefix("group-stop");
let env = TestEnv::with_prefix("group-stop").await;
let group = env
.create_worker_group(MitoConfig {
num_workers: 4,

View File

@@ -1 +1 @@
v0.10.0
v0.10.1

View File

@@ -39,6 +39,7 @@ use crate::http::event::{
extract_pipeline_params_map_from_headers, ingest_logs_inner, LogIngesterQueryParams, LogState,
PipelineIngestRequest,
};
use crate::http::header::constants::GREPTIME_PIPELINE_NAME_HEADER_NAME;
use crate::metrics::{
METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT, METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED,
};
@@ -132,7 +133,7 @@ async fn do_handle_bulk_api(
// The `schema` is already set in the query_ctx in auth process.
query_ctx.set_channel(Channel::Elasticsearch);
let db = params.db.unwrap_or_else(|| "public".to_string());
let db = query_ctx.current_schema();
// Record the ingestion time histogram.
let _timer = METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED
@@ -140,11 +141,12 @@ async fn do_handle_bulk_api(
.start_timer();
// If pipeline_name is not provided, use the internal pipeline.
let pipeline_name = if let Some(pipeline) = params.pipeline_name {
pipeline
} else {
GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME.to_string()
};
let pipeline_name = params.pipeline_name.as_deref().unwrap_or_else(|| {
headers
.get(GREPTIME_PIPELINE_NAME_HEADER_NAME)
.and_then(|v| v.to_str().ok())
.unwrap_or(GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME)
});
// Read the ndjson payload and convert it to a vector of Value.
let requests = match parse_bulk_request(&payload, &index, &params.msg_field) {
@@ -164,7 +166,7 @@ async fn do_handle_bulk_api(
};
let log_num = requests.len();
let pipeline = match PipelineDefinition::from_name(&pipeline_name, None, None) {
let pipeline = match PipelineDefinition::from_name(pipeline_name, None, None) {
Ok(pipeline) => pipeline,
Err(e) => {
// should be unreachable

View File

@@ -117,7 +117,7 @@ const DEFAULT_BODY_LIMIT: ReadableSize = ReadableSize::mb(64);
pub const AUTHORIZATION_HEADER: &str = "x-greptime-auth";
// TODO(fys): This is a temporary workaround, it will be improved later
pub static PUBLIC_APIS: [&str; 2] = ["/v1/influxdb/ping", "/v1/influxdb/health"];
pub static PUBLIC_APIS: [&str; 3] = ["/v1/influxdb/ping", "/v1/influxdb/health", "/v1/health"];
#[derive(Default)]
pub struct HttpServer {
@@ -306,7 +306,8 @@ pub enum GreptimeQueryOutput {
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
pub enum ResponseFormat {
Arrow,
Csv,
// (with_names, with_types)
Csv(bool, bool),
Table,
#[default]
GreptimedbV1,
@@ -318,7 +319,9 @@ impl ResponseFormat {
pub fn parse(s: &str) -> Option<Self> {
match s {
"arrow" => Some(ResponseFormat::Arrow),
"csv" => Some(ResponseFormat::Csv),
"csv" => Some(ResponseFormat::Csv(false, false)),
"csvwithnames" => Some(ResponseFormat::Csv(true, false)),
"csvwithnamesandtypes" => Some(ResponseFormat::Csv(true, true)),
"table" => Some(ResponseFormat::Table),
"greptimedb_v1" => Some(ResponseFormat::GreptimedbV1),
"influxdb_v1" => Some(ResponseFormat::InfluxdbV1),
@@ -330,7 +333,7 @@ impl ResponseFormat {
pub fn as_str(&self) -> &'static str {
match self {
ResponseFormat::Arrow => "arrow",
ResponseFormat::Csv => "csv",
ResponseFormat::Csv(_, _) => "csv",
ResponseFormat::Table => "table",
ResponseFormat::GreptimedbV1 => "greptimedb_v1",
ResponseFormat::InfluxdbV1 => "influxdb_v1",
@@ -720,6 +723,10 @@ impl HttpServer {
"/health",
routing::get(handler::health).post(handler::health),
)
.route(
&format!("/{HTTP_API_VERSION}/health"),
routing::get(handler::health).post(handler::health),
)
.route(
"/ready",
routing::get(handler::health).post(handler::health),
@@ -1296,6 +1303,16 @@ mod test {
"*"
);
let res = client.get("/v1/health").send().await;
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers()
.get(http::header::ACCESS_CONTROL_ALLOW_ORIGIN)
.expect("expect cors header origin"),
"*"
);
let res = client
.options("/health")
.header("Access-Control-Request-Headers", "x-greptime-auth")
@@ -1480,7 +1497,7 @@ mod test {
for format in [
ResponseFormat::GreptimedbV1,
ResponseFormat::InfluxdbV1,
ResponseFormat::Csv,
ResponseFormat::Csv(true, true),
ResponseFormat::Table,
ResponseFormat::Arrow,
ResponseFormat::Json,
@@ -1490,7 +1507,9 @@ mod test {
let outputs = vec![Ok(Output::new_with_record_batches(recordbatches))];
let json_resp = match format {
ResponseFormat::Arrow => ArrowResponse::from_output(outputs, None).await,
ResponseFormat::Csv => CsvResponse::from_output(outputs).await,
ResponseFormat::Csv(with_names, with_types) => {
CsvResponse::from_output(outputs, with_names, with_types).await
}
ResponseFormat::Table => TableResponse::from_output(outputs).await,
ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, None).await,
@@ -1583,4 +1602,46 @@ mod test {
}
}
}
#[test]
fn test_response_format_misc() {
assert_eq!(ResponseFormat::default(), ResponseFormat::GreptimedbV1);
assert_eq!(ResponseFormat::parse("arrow"), Some(ResponseFormat::Arrow));
assert_eq!(
ResponseFormat::parse("csv"),
Some(ResponseFormat::Csv(false, false))
);
assert_eq!(
ResponseFormat::parse("csvwithnames"),
Some(ResponseFormat::Csv(true, false))
);
assert_eq!(
ResponseFormat::parse("csvwithnamesandtypes"),
Some(ResponseFormat::Csv(true, true))
);
assert_eq!(ResponseFormat::parse("table"), Some(ResponseFormat::Table));
assert_eq!(
ResponseFormat::parse("greptimedb_v1"),
Some(ResponseFormat::GreptimedbV1)
);
assert_eq!(
ResponseFormat::parse("influxdb_v1"),
Some(ResponseFormat::InfluxdbV1)
);
assert_eq!(ResponseFormat::parse("json"), Some(ResponseFormat::Json));
// invalid formats
assert_eq!(ResponseFormat::parse("invalid"), None);
assert_eq!(ResponseFormat::parse(""), None);
assert_eq!(ResponseFormat::parse("CSV"), None); // Case sensitive
// as str
assert_eq!(ResponseFormat::Arrow.as_str(), "arrow");
assert_eq!(ResponseFormat::Csv(false, false).as_str(), "csv");
assert_eq!(ResponseFormat::Csv(true, true).as_str(), "csv");
assert_eq!(ResponseFormat::Table.as_str(), "table");
assert_eq!(ResponseFormat::GreptimedbV1.as_str(), "greptimedb_v1");
assert_eq!(ResponseFormat::InfluxdbV1.as_str(), "influxdb_v1");
assert_eq!(ResponseFormat::Json.as_str(), "json");
}
}

View File

@@ -138,7 +138,9 @@ pub async fn sql(
ResponseFormat::Arrow => {
ArrowResponse::from_output(outputs, query_params.compression).await
}
ResponseFormat::Csv => CsvResponse::from_output(outputs).await,
ResponseFormat::Csv(with_names, with_types) => {
CsvResponse::from_output(outputs, with_names, with_types).await
}
ResponseFormat::Table => TableResponse::from_output(outputs).await,
ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
@@ -327,7 +329,9 @@ pub async fn promql(
match format {
ResponseFormat::Arrow => ArrowResponse::from_output(outputs, compression).await,
ResponseFormat::Csv => CsvResponse::from_output(outputs).await,
ResponseFormat::Csv(with_names, with_types) => {
CsvResponse::from_output(outputs, with_names, with_types).await
}
ResponseFormat::Table => TableResponse::from_output(outputs).await,
ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,

View File

@@ -18,9 +18,9 @@ use common_error::status_code::StatusCode;
use common_query::Output;
use mime_guess::mime;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
// use super::process_with_limit;
use crate::http::result::error_result::ErrorResponse;
use crate::http::{handler, process_with_limit, GreptimeQueryOutput, HttpResponse, ResponseFormat};
@@ -28,10 +28,16 @@ use crate::http::{handler, process_with_limit, GreptimeQueryOutput, HttpResponse
pub struct CsvResponse {
output: Vec<GreptimeQueryOutput>,
execution_time_ms: u64,
with_names: bool,
with_types: bool,
}
impl CsvResponse {
pub async fn from_output(outputs: Vec<crate::error::Result<Output>>) -> HttpResponse {
pub async fn from_output(
outputs: Vec<crate::error::Result<Output>>,
with_names: bool,
with_types: bool,
) -> HttpResponse {
match handler::from_output(outputs).await {
Err(err) => HttpResponse::Error(err),
Ok((output, _)) => {
@@ -41,10 +47,14 @@ impl CsvResponse {
"cannot output multi-statements result in csv format".to_string(),
))
} else {
HttpResponse::Csv(CsvResponse {
let csv_resp = CsvResponse {
output,
execution_time_ms: 0,
})
with_names: false,
with_types: false,
};
HttpResponse::Csv(csv_resp.with_names(with_names).with_types(with_types))
}
}
}
@@ -67,6 +77,21 @@ impl CsvResponse {
self.output = process_with_limit(self.output, limit);
self
}
pub fn with_names(mut self, with_names: bool) -> Self {
self.with_names = with_names;
self
}
pub fn with_types(mut self, with_types: bool) -> Self {
self.with_types = with_types;
// If `with_type` is true, than always set `with_names` to be true.
if with_types {
self.with_names = true;
}
self
}
}
macro_rules! http_try {
@@ -100,11 +125,50 @@ impl IntoResponse for CsvResponse {
format!("{n}\n")
}
Some(GreptimeQueryOutput::Records(records)) => {
let mut wtr = csv::Writer::from_writer(Vec::new());
let mut wtr = csv::WriterBuilder::new()
.terminator(csv::Terminator::CRLF) // RFC 4180
.from_writer(Vec::new());
if self.with_names {
let names = records
.schema
.column_schemas
.iter()
.map(|c| &c.name)
.collect::<Vec<_>>();
http_try!(wtr.serialize(names));
}
if self.with_types {
let types = records
.schema
.column_schemas
.iter()
.map(|c| &c.data_type)
.collect::<Vec<_>>();
http_try!(wtr.serialize(types));
}
for row in records.rows {
let row = row
.into_iter()
.map(|value| {
match value {
// Cast array and object to string
JsonValue::Array(a) => {
JsonValue::String(serde_json::to_string(&a).unwrap_or_default())
}
JsonValue::Object(o) => {
JsonValue::String(serde_json::to_string(&o).unwrap_or_default())
}
v => v,
}
})
.collect::<Vec<_>>();
http_try!(wtr.serialize(row));
}
http_try!(wtr.flush());
let bytes = http_try!(wtr.into_inner());
@@ -122,7 +186,9 @@ impl IntoResponse for CsvResponse {
.into_response();
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_FORMAT,
HeaderValue::from_static(ResponseFormat::Csv.as_str()),
HeaderValue::from_static(
ResponseFormat::Csv(self.with_names, self.with_types).as_str(),
),
);
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_EXECUTION_TIME,
@@ -131,3 +197,97 @@ impl IntoResponse for CsvResponse {
resp
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::Output;
use common_recordbatch::{RecordBatch, RecordBatches};
use datatypes::prelude::{ConcreteDataType, ScalarVector};
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{BinaryVector, Float32Vector, StringVector, UInt32Vector, VectorRef};
use super::*;
#[tokio::test]
async fn test_csv_response_with_names_and_types() {
let (schema, columns) = create_test_data();
let data = r#"1,,-1000.1400146484375,"{""a"":{""b"":2},""b"":2,""c"":3}"
2,hello,1.9900000095367432,"{""a"":4,""b"":{""c"":6},""c"":6}""#
.replace("\n", "\r\n");
// Test with_names=true, with_types=true
{
let body = get_csv_body(&schema, &columns, true, true).await;
assert!(body.starts_with("col1,col2,col3,col4\r\nUInt32,String,Float32,Json\r\n"));
assert!(body.contains(&data));
}
// Test with_names=true, with_types=false
{
let body = get_csv_body(&schema, &columns, true, false).await;
assert!(body.starts_with("col1,col2,col3,col4\r\n"));
assert!(!body.contains("UInt32,String,Float32,Json"));
assert!(body.contains(&data));
}
// Test with_names=false, with_types=false
{
let body = get_csv_body(&schema, &columns, false, false).await;
assert!(!body.starts_with("col1,col2,col3,col4"));
assert!(!body.contains("UInt32,String,Float32,Json"));
assert!(body.contains(&data));
}
}
fn create_test_data() -> (Arc<Schema>, Vec<VectorRef>) {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::uint32_datatype(), false),
ColumnSchema::new("col2", ConcreteDataType::string_datatype(), true),
ColumnSchema::new("col3", ConcreteDataType::float32_datatype(), true),
ColumnSchema::new("col4", ConcreteDataType::json_datatype(), true),
];
let schema = Arc::new(Schema::new(column_schemas));
let json_strings = [
r#"{"a": {"b": 2}, "b": 2, "c": 3}"#,
r#"{"a": 4, "b": {"c": 6}, "c": 6}"#,
];
let jsonbs = json_strings
.iter()
.map(|s| {
let value = jsonb::parse_value(s.as_bytes()).unwrap();
value.to_vec()
})
.collect::<Vec<_>>();
let columns: Vec<VectorRef> = vec![
Arc::new(UInt32Vector::from_slice(vec![1, 2])),
Arc::new(StringVector::from(vec![None, Some("hello")])),
Arc::new(Float32Vector::from_slice(vec![-1000.14, 1.99])),
Arc::new(BinaryVector::from_vec(jsonbs)),
];
(schema, columns)
}
async fn get_csv_body(
schema: &Arc<Schema>,
columns: &[VectorRef],
with_names: bool,
with_types: bool,
) -> String {
let recordbatch = RecordBatch::new(schema.clone(), columns.to_vec()).unwrap();
let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch]).unwrap();
let output = Output::new_with_record_batches(recordbatches);
let outputs = vec![Ok(output)];
let resp = CsvResponse::from_output(outputs, with_names, with_types)
.await
.into_response();
let bytes = axum::body::to_bytes(resp.into_body(), 1024).await.unwrap();
String::from_utf8(bytes.to_vec()).unwrap()
}
}

View File

@@ -142,7 +142,7 @@ async fn test_sql_output_rows() {
axum::body::to_bytes(resp.into_body(), usize::MAX)
.await
.unwrap(),
Bytes::from_static(b"4950\n"),
Bytes::from_static(b"4950\r\n"),
);
}
HttpResponse::Table(resp) => {
@@ -289,7 +289,7 @@ async fn test_sql_form() {
axum::body::to_bytes(resp.into_body(), usize::MAX)
.await
.unwrap(),
Bytes::from_static(b"4950\n"),
Bytes::from_static(b"4950\r\n"),
);
}
HttpResponse::Table(resp) => {

View File

@@ -453,7 +453,28 @@ pub async fn test_sql_api(store_type: StorageType) {
assert_eq!(res.status(), StatusCode::OK);
let body = &res.text().await;
// Must be escaped correctly: 66.6,0,"host, ""name"
assert_eq!(body, "66.6,0,\"host, \"\"name\"\n");
assert_eq!(body, "66.6,0,\"host, \"\"name\"\r\n");
// csv with names
let res = client
.get("/v1/sql?format=csvWithNames&sql=select cpu,ts,host from demo limit 1")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = &res.text().await;
assert_eq!(body, "cpu,ts,host\r\n66.6,0,\"host, \"\"name\"\r\n");
// csv with names and types
let res = client
.get("/v1/sql?format=csvWithNamesAndTypes&sql=select cpu,ts,host from demo limit 1")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body = &res.text().await;
assert_eq!(
body,
"cpu,ts,host\r\nFloat64,TimestampMillisecond,String\r\n66.6,0,\"host, \"\"name\"\r\n"
);
// test parse method
let res = client.get("/v1/sql/parse?sql=desc table t").send().await;
@@ -523,7 +544,7 @@ pub async fn test_prometheus_promql_api(store_type: StorageType) {
assert_eq!(res.status(), StatusCode::OK);
let csv_body = &res.text().await;
assert_eq!("0,1.0\n5000,1.0\n10000,1.0\n15000,1.0\n20000,1.0\n25000,1.0\n30000,1.0\n35000,1.0\n40000,1.0\n45000,1.0\n50000,1.0\n55000,1.0\n60000,1.0\n65000,1.0\n70000,1.0\n75000,1.0\n80000,1.0\n85000,1.0\n90000,1.0\n95000,1.0\n100000,1.0\n", csv_body);
assert_eq!("0,1.0\r\n5000,1.0\r\n10000,1.0\r\n15000,1.0\r\n20000,1.0\r\n25000,1.0\r\n30000,1.0\r\n35000,1.0\r\n40000,1.0\r\n45000,1.0\r\n50000,1.0\r\n55000,1.0\r\n60000,1.0\r\n65000,1.0\r\n70000,1.0\r\n75000,1.0\r\n80000,1.0\r\n85000,1.0\r\n90000,1.0\r\n95000,1.0\r\n100000,1.0\r\n", csv_body);
guard.remove_all().await;
}