mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-28 02:40:38 +00:00
* test: add physical plan wrapper trait * test: add plugins to datanode initialization * test: add plugins to datanode initialization * chore: add metrics method * chore: update meter-core version * chore: remove unused code * chore: impl metrics method on df execution plan adapter * chore: minor comment fix * chore: add retry in create table * chore: shrink keep lease handler buffer * chore: add etcd batch size warn * chore: try shrink * Revert "chore: try shrink" This reverts commit0361b51670. * chore: add create table backup time * add metrics in some interfaces Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * calc elapsed time and rows Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * chore: remove timer in scan batch * chore: add back stream metrics wrapper * chore: add timer to ready poll * chore: minor update * chore: try using df_plan.metrics() * chore: remove table scan timer * chore: remove scan timer * chore: add debug log * Revert "chore: add debug log" This reverts commit672a0138fd. * chore: use batch size as row count * chore: use batch size as row count * chore: tune code for pr * chore: rename to physical plan wrapper --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com> Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
131 lines
3.7 KiB
Rust
131 lines
3.7 KiB
Rust
// Copyright 2023 Greptime Team
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
mod instance_test;
|
|
mod promql_test;
|
|
mod test_util;
|
|
|
|
use std::collections::HashMap;
|
|
use std::sync::Arc;
|
|
|
|
use catalog::RegisterSchemaRequest;
|
|
use common_test_util::temp_dir::TempDir;
|
|
use datanode::instance::Instance as DatanodeInstance;
|
|
use frontend::instance::Instance;
|
|
use table::engine::{region_name, table_dir};
|
|
|
|
use crate::cluster::{GreptimeDbCluster, GreptimeDbClusterBuilder};
|
|
use crate::test_util::{create_tmp_dir_and_datanode_opts, StorageType, TempDirGuard, TestGuard};
|
|
|
|
pub struct MockDistributedInstance(GreptimeDbCluster);
|
|
|
|
impl MockDistributedInstance {
|
|
pub fn data_tmp_dirs(&self) -> Vec<&TempDir> {
|
|
self.0
|
|
.storage_guards
|
|
.iter()
|
|
.map(|g| {
|
|
let TempDirGuard::File(dir) = &g.0 else { unreachable!() };
|
|
dir
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
pub fn frontend(&self) -> Arc<Instance> {
|
|
self.0.frontend.clone()
|
|
}
|
|
|
|
pub fn datanodes(&self) -> &HashMap<u64, Arc<DatanodeInstance>> {
|
|
&self.0.datanode_instances
|
|
}
|
|
}
|
|
|
|
pub struct MockStandaloneInstance {
|
|
pub instance: Arc<Instance>,
|
|
_guard: TestGuard,
|
|
}
|
|
|
|
impl MockStandaloneInstance {
|
|
pub fn data_tmp_dir(&self) -> &TempDir {
|
|
let TempDirGuard::File(dir) = &self._guard.storage_guard.0 else { unreachable!() };
|
|
dir
|
|
}
|
|
}
|
|
|
|
pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandaloneInstance {
|
|
let (opts, guard) = create_tmp_dir_and_datanode_opts(StorageType::File, test_name);
|
|
let (dn_instance, heartbeat) = DatanodeInstance::with_opts(&opts, Default::default())
|
|
.await
|
|
.unwrap();
|
|
|
|
let frontend_instance = Instance::try_new_standalone(dn_instance.clone())
|
|
.await
|
|
.unwrap();
|
|
|
|
dn_instance
|
|
.catalog_manager()
|
|
.register_catalog("another_catalog".to_string())
|
|
.await
|
|
.unwrap();
|
|
let req = RegisterSchemaRequest {
|
|
catalog: "another_catalog".to_string(),
|
|
schema: "another_schema".to_string(),
|
|
};
|
|
dn_instance
|
|
.catalog_manager()
|
|
.register_schema(req)
|
|
.await
|
|
.unwrap();
|
|
|
|
dn_instance.start().await.unwrap();
|
|
if let Some(heartbeat) = heartbeat {
|
|
heartbeat.start().await.unwrap();
|
|
};
|
|
MockStandaloneInstance {
|
|
instance: Arc::new(frontend_instance),
|
|
_guard: guard,
|
|
}
|
|
}
|
|
|
|
pub async fn create_distributed_instance(test_name: &str) -> MockDistributedInstance {
|
|
let cluster = GreptimeDbClusterBuilder::new(test_name).build().await;
|
|
MockDistributedInstance(cluster)
|
|
}
|
|
|
|
pub fn test_region_dir(
|
|
dir: &str,
|
|
catalog_name: &str,
|
|
schema_name: &str,
|
|
table_id: u32,
|
|
region_id: u32,
|
|
) -> String {
|
|
let table_dir = table_dir(catalog_name, schema_name, table_id);
|
|
let region_name = region_name(table_id, region_id);
|
|
|
|
format!("{}/{}/{}", dir, table_dir, region_name)
|
|
}
|
|
|
|
pub fn has_parquet_file(sst_dir: &str) -> bool {
|
|
for entry in std::fs::read_dir(sst_dir).unwrap() {
|
|
let entry = entry.unwrap();
|
|
let path = entry.path();
|
|
if !path.is_dir() {
|
|
assert_eq!("parquet", path.extension().unwrap());
|
|
return true;
|
|
}
|
|
}
|
|
|
|
false
|
|
}
|