mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-16 13:00:40 +00:00
* feat: metric batch 2s PoC Signed-off-by: jeremyhi <fengjiachun@gmail.com> * chore: max_concurrent_flushes Signed-off-by: jeremyhi <fengjiachun@gmail.com> * chore: work channel size Signed-off-by: jeremyhi <fengjiachun@gmail.com> * feat(servers): add metrics and logs for pending rows batch flush Add the `FLUSH_ELAPSED` histogram metric to track the duration of pending rows batch flushes in the Prometheus store protocol handler. This provides better observability into the performance and latency of the batcher. Also update telemetry by: - Recording elapsed time for both successful and failed flush operations. - Adding an informational log upon successful flush including row count and duration. - Including elapsed time in error logs when a flush fails. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat(servers): implement columnar batching for pending rows Refactor PendingRowsBatcher to use columnar batching for the metrics store. Incoming RowInsertRequests are now converted to RecordBatches, partitioned, and flushed via BulkInsert requests to datanodes. - Enhance MultiDimPartitionRule to handle scalar boolean predicates. - Add metrics for tracking flush failures and dropped rows. - Update dependencies to support columnar batching in servers. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat(servers): add backpressure for pending rows Implement backpressure in PendingRowsBatcher by limiting in-flight requests with a semaphore and making the submission wait for the flush result. This ensures Prometheus write requests are throttled and only return once the data has been successfully flushed to datanodes. - Add max_inflight_requests to PromStoreOptions. - Use oneshot channels to notify submitters of flush completion. - Limit concurrent requests using a new inflight_semaphore. - Update PendingRowsBatcher::submit to wait for the flush outcome. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat: add stage-level metrics for bulk ingestion Introduce histograms to track the elapsed time of various stages in the metric engine bulk insert path and the server's pending rows batcher. This provides better observability into the performance bottlenecks of the ingestion pipeline. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * - `src/metric-engine/src/engine/bulk_insert.rs`: Removed the fallback mechanism that converted record batches to rows when bulk inserts were unsupported, along with related helper functions and unused imports. - `src/operator/src/insert.rs`: Removed an unused import (`common_time::TimeToLive::Instant`). Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat(servers): columnar Prom remote write Optimize the Prometheus remote write path by allowing direct conversion from decoded Prometheus samples to Arrow RecordBatches. This bypasses intermediate row-based representations when `PendingRowsBatcher` is active and no pipeline is used, improving ingestion efficiency. - Implement `as_record_batch_groups` in `TablesBuilder` and `PromWriteRequest`. - Add `submit_prom_record_batch_groups` to `PendingRowsBatcher`. - Introduce `DecodedPromWriteRequest` in `prom_store`. - Implement row-to-RecordBatch conversion logic in `prom_row_builder`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * Revert "feat(servers): columnar Prom remote write" This reverts commit efbb63c12a3e7fcec03858ea0351efd94fec8242. * refactor(servers): improve row to RecordBatch conversion - Use `snafu::ensure` for row validation in `rows_to_record_batch`. - Add explicit type hint for `MutableVector` to improve clarity. - Reorganize and clean up imports in `pending_rows_batcher.rs`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * perf(servers): use arrow builders for row conversion This commit optimizes the conversion from `api::v1::Rows` to `RecordBatch` by using Arrow builders directly. This avoids the overhead of `MutableVector` and `common_recordbatch`, leading to better performance in the `pending_rows_batcher`. Additionally, the `#[allow(dead_code)]` attribute is removed from `modify_batch_sparse` in the metric engine as it is now utilized. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * perf(metric-engine): optimize batch modification Optimize `modify_batch_sparse` by reusing buffers, using Arrow builders, and employing fast-path encoding methods. This reduces allocations and avoids redundant downcasting and serializer overhead. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/metric-engine-support-bulk: **Add Environment Variable for Batch Sync Control** - `pending_rows_batcher.rs`: Introduced an environment variable `PENDING_ROWS_BATCH_SYNC` to control the synchronization behavior of batch processing. If set to true, the function will wait for the flush result; otherwise, it will return immediatel with the total rows count. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * wip Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * chore: update and fix clippy Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * fix: failing test Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * picking-pending-rows-batcher: ### Commit Message Remove Unused Code and Simplify Error Handling - **`src/error.rs`**: Removed the `BatcherQueueFull` error variant and its associated logic, simplifying the error handling by removing unused code. - **`src/http/prom_store.rs`**: Eliminated the `try_decompress` function, streamlining the decompression logic by directly using `snappy_decompress` in `decode_remote_read_request`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * chore: parse PENDING_ROWS_BATCH_SYNC once Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * chore: revert unrelated changes Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * **Refactor Prometheus Write Handling** - **`prom_store.rs`**: Introduced `pre_write` method in `PromStoreProtocolHandler` to handle pre-write checks for Prometheus remote write requests. Updated `write` method to utilize `pre_write`. - **`server.rs`**: Modified `PendingRowsBatcher` initialization to conditionally create a batcher based on `with_metric_engine` flag. - **`http/prom_store.rs`**: Integrated `pre_write` checks before submitting requests to `PendingRowsBatcher`. - **`query_handler.rs`**: Added `pre_write` method to `PromStoreProtocolHandler` trait for pre-write operations. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * picking-pending-rows-batcher: - **Fix Label Typo**: Corrected a typo in the label value from `"flush_wn ite_region"` to `"flush_write_region"` in `pending_rows_batcher.rs`. - **Refactor Array Building Logic**: Introduced a macro `build_array!` to streamline the construction of `ArrayRef` for different data types, reducing code duplication in `pending_rows_batcher.rs`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * format toml Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * picking-pending-rows-batcher: ### Update PromStore and PendingRowsBatcher Configuration - **`prom_store.rs`**: Set `pending_rows_flush_interval` to `Duration::ZERO` to disable automatic flushing. - **`pending_rows_batcher.rs`**: Enhance validation to disable the batcher when `flush_interval` is zero or configuration values like `max_batch_rows`, `max_concurrent_flushes`, `worker_channel_capacity`, or `max_inflight_requests` are zero, preventing potential panics or deadlocks. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * picking-pending-rows-batcher: ### Update `pending_rows_flush_interval` to Zero - **Files Modified**: - `src/frontend/src/service_config/prom_store.rs` - `tests-integration/tests/http.rs` - **Key Changes**: - Updated `pending_rows_flush_interval` from `Duration::from_secs(2)` to `Duration::ZERO` in `prom_store.rs`. - Changed `pending_rows_flush_interval` configuration from `"2s"` to `"0s"` in `http.rs`. These changes set the flush interval to zero, potentially affecting how frequently pending rows are flushed. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * picking-pending-rows-batcher: **Add Worker Management Enhancements** - **`metrics.rs`**: Introduced `PENDING_WORKERS` gauge to track active pending rows batch workers. - **`pending_rows_batcher.rs`**: - Added worker idle timeout logic with `WORKER_IDLE_TIMEOUT_MULTIPLIER`. - Implemented worker management functions: `spawn_worker`, `remove_worker_if_same_channel`, and `should_close_worker_on_idle_timeout`. - Enhanced worker lifecycle management to handle idle workers and ensure proper cleanup. - **Tests**: Added unit tests for worker removal and idle timeout logic. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * fix: clippy Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> --------- Signed-off-by: jeremyhi <fengjiachun@gmail.com> Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> Co-authored-by: jeremyhi <fengjiachun@gmail.com>
947 lines
31 KiB
Rust
947 lines
31 KiB
Rust
// Copyright 2023 Greptime Team
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
use std::env;
|
|
use std::fmt::Display;
|
|
use std::net::SocketAddr;
|
|
use std::sync::Arc;
|
|
|
|
use auth::{DefaultPermissionChecker, PermissionCheckerRef, UserProviderRef};
|
|
use axum::Router;
|
|
use catalog::kvbackend::KvBackendCatalogManager;
|
|
use common_base::Plugins;
|
|
use common_config::Configurable;
|
|
use common_meta::key::catalog_name::CatalogNameKey;
|
|
use common_meta::key::schema_name::SchemaNameKey;
|
|
use common_query::Output;
|
|
use common_runtime::runtime::BuilderBuild;
|
|
use common_runtime::{Builder as RuntimeBuilder, Runtime};
|
|
use common_test_util::ports;
|
|
use common_test_util::temp_dir::{TempDir, create_temp_dir};
|
|
use common_wal::config::DatanodeWalConfig;
|
|
use datanode::config::{DatanodeOptions, StorageConfig};
|
|
use frontend::instance::Instance;
|
|
use frontend::service_config::{MysqlOptions, PostgresOptions};
|
|
use mito2::gc::GcConfig;
|
|
use object_store::config::{
|
|
AzblobConfig, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig, S3Config,
|
|
};
|
|
use object_store::services::{Azblob, Gcs, Oss, S3};
|
|
use object_store::test_util::TempFolder;
|
|
use object_store::{AzblobConnection, GcsConnection, ObjectStore, OssConnection, S3Connection};
|
|
use servers::grpc::builder::GrpcServerBuilder;
|
|
use servers::grpc::greptime_handler::GreptimeRequestHandler;
|
|
use servers::grpc::{FlightCompression, GrpcOptions, GrpcServer, GrpcServerConfig};
|
|
use servers::http::{HttpOptions, HttpServerBuilder};
|
|
use servers::metrics_handler::MetricsHandler;
|
|
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
|
|
use servers::otel_arrow::OtelArrowServiceHandler;
|
|
use servers::postgres::PostgresServer;
|
|
use servers::prom_remote_write::validation::PromValidationMode;
|
|
use servers::query_handler::sql::SqlQueryHandler;
|
|
use servers::request_memory_limiter::ServerMemoryLimiter;
|
|
use servers::server::Server;
|
|
use servers::tls::ReloadableTlsServerConfig;
|
|
use session::context::QueryContext;
|
|
|
|
use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder};
|
|
|
|
pub const PEER_PLACEHOLDER_ADDR: &str = "127.0.0.1:3001";
|
|
|
|
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
|
|
pub enum StorageType {
|
|
S3,
|
|
S3WithCache,
|
|
File,
|
|
Oss,
|
|
Azblob,
|
|
Gcs,
|
|
}
|
|
|
|
impl Display for StorageType {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
match self {
|
|
StorageType::S3 => write!(f, "S3"),
|
|
StorageType::S3WithCache => write!(f, "S3"),
|
|
StorageType::File => write!(f, "File"),
|
|
StorageType::Oss => write!(f, "Oss"),
|
|
StorageType::Azblob => write!(f, "Azblob"),
|
|
StorageType::Gcs => write!(f, "Gcs"),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl StorageType {
|
|
pub fn build_storage_types_based_on_env() -> Vec<StorageType> {
|
|
let mut storage_types = Vec::with_capacity(4);
|
|
storage_types.push(StorageType::File);
|
|
if let Ok(bucket) = env::var("GT_S3_BUCKET")
|
|
&& !bucket.is_empty()
|
|
{
|
|
storage_types.push(StorageType::S3);
|
|
}
|
|
if env::var("GT_OSS_BUCKET").is_ok() {
|
|
storage_types.push(StorageType::Oss);
|
|
}
|
|
if env::var("GT_AZBLOB_CONTAINER").is_ok() {
|
|
storage_types.push(StorageType::Azblob);
|
|
}
|
|
if env::var("GT_GCS_BUCKET").is_ok() {
|
|
storage_types.push(StorageType::Gcs);
|
|
}
|
|
storage_types
|
|
}
|
|
|
|
pub fn test_on(&self) -> bool {
|
|
let _ = dotenv::dotenv();
|
|
|
|
match self {
|
|
StorageType::File => true, // always test file
|
|
StorageType::S3 | StorageType::S3WithCache => {
|
|
if let Ok(b) = env::var("GT_S3_BUCKET") {
|
|
!b.is_empty()
|
|
} else {
|
|
false
|
|
}
|
|
}
|
|
StorageType::Oss => {
|
|
if let Ok(b) = env::var("GT_OSS_BUCKET") {
|
|
!b.is_empty()
|
|
} else {
|
|
false
|
|
}
|
|
}
|
|
StorageType::Azblob => {
|
|
if let Ok(b) = env::var("GT_AZBLOB_CONTAINER") {
|
|
!b.is_empty()
|
|
} else {
|
|
false
|
|
}
|
|
}
|
|
StorageType::Gcs => {
|
|
if let Ok(b) = env::var("GT_GCS_BUCKET") {
|
|
!b.is_empty()
|
|
} else {
|
|
false
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn s3_test_config() -> S3Config {
|
|
S3Config {
|
|
connection: S3Connection {
|
|
root: uuid::Uuid::new_v4().to_string(),
|
|
access_key_id: env::var("GT_S3_ACCESS_KEY_ID").unwrap().into(),
|
|
secret_access_key: env::var("GT_S3_ACCESS_KEY").unwrap().into(),
|
|
bucket: env::var("GT_S3_BUCKET").unwrap(),
|
|
region: Some(env::var("GT_S3_REGION").unwrap()),
|
|
endpoint: env::var("GT_S3_ENDPOINT_URL").ok(),
|
|
..Default::default()
|
|
},
|
|
..Default::default()
|
|
}
|
|
}
|
|
|
|
pub fn get_test_store_config(store_type: &StorageType) -> (ObjectStoreConfig, TempDirGuard) {
|
|
let _ = dotenv::dotenv();
|
|
|
|
match store_type {
|
|
StorageType::Gcs => {
|
|
let gcs_config = GcsConfig {
|
|
connection: GcsConnection {
|
|
root: uuid::Uuid::new_v4().to_string(),
|
|
bucket: env::var("GT_GCS_BUCKET").unwrap(),
|
|
scope: env::var("GT_GCS_SCOPE").unwrap(),
|
|
credential_path: env::var("GT_GCS_CREDENTIAL_PATH").unwrap().into(),
|
|
credential: env::var("GT_GCS_CREDENTIAL").unwrap().into(),
|
|
endpoint: env::var("GT_GCS_ENDPOINT").unwrap_or_default(),
|
|
},
|
|
..Default::default()
|
|
};
|
|
|
|
let builder = Gcs::from(&gcs_config.connection);
|
|
let config = ObjectStoreConfig::Gcs(gcs_config);
|
|
let store = ObjectStore::new(builder).unwrap().finish();
|
|
(config, TempDirGuard::Gcs(TempFolder::new(&store, "/")))
|
|
}
|
|
StorageType::Azblob => {
|
|
let azblob_config = AzblobConfig {
|
|
connection: AzblobConnection {
|
|
root: uuid::Uuid::new_v4().to_string(),
|
|
container: env::var("GT_AZBLOB_CONTAINER").unwrap(),
|
|
account_name: env::var("GT_AZBLOB_ACCOUNT_NAME").unwrap().into(),
|
|
account_key: env::var("GT_AZBLOB_ACCOUNT_KEY").unwrap().into(),
|
|
endpoint: env::var("GT_AZBLOB_ENDPOINT").unwrap(),
|
|
..Default::default()
|
|
},
|
|
..Default::default()
|
|
};
|
|
|
|
let builder = Azblob::from(&azblob_config.connection);
|
|
let config = ObjectStoreConfig::Azblob(azblob_config);
|
|
let store = ObjectStore::new(builder).unwrap().finish();
|
|
(config, TempDirGuard::Azblob(TempFolder::new(&store, "/")))
|
|
}
|
|
StorageType::Oss => {
|
|
let oss_config = OssConfig {
|
|
connection: OssConnection {
|
|
root: uuid::Uuid::new_v4().to_string(),
|
|
access_key_id: env::var("GT_OSS_ACCESS_KEY_ID").unwrap().into(),
|
|
access_key_secret: env::var("GT_OSS_ACCESS_KEY").unwrap().into(),
|
|
bucket: env::var("GT_OSS_BUCKET").unwrap(),
|
|
endpoint: env::var("GT_OSS_ENDPOINT").unwrap(),
|
|
},
|
|
..Default::default()
|
|
};
|
|
|
|
let builder = Oss::from(&oss_config.connection);
|
|
let config = ObjectStoreConfig::Oss(oss_config);
|
|
let store = ObjectStore::new(builder).unwrap().finish();
|
|
(config, TempDirGuard::Oss(TempFolder::new(&store, "/")))
|
|
}
|
|
StorageType::S3 | StorageType::S3WithCache => {
|
|
let mut s3_config = s3_test_config();
|
|
|
|
if *store_type == StorageType::S3WithCache {
|
|
s3_config.cache.cache_path = "/tmp/greptimedb_cache".to_string();
|
|
} else {
|
|
s3_config.cache.enable_read_cache = false;
|
|
}
|
|
|
|
let builder = S3::from(&s3_config.connection);
|
|
let config = ObjectStoreConfig::S3(s3_config);
|
|
let store = ObjectStore::new(builder).unwrap().finish();
|
|
(config, TempDirGuard::S3(TempFolder::new(&store, "/")))
|
|
}
|
|
StorageType::File => (ObjectStoreConfig::File(FileConfig {}), TempDirGuard::None),
|
|
}
|
|
}
|
|
|
|
pub enum TempDirGuard {
|
|
None,
|
|
S3(TempFolder),
|
|
Oss(TempFolder),
|
|
Azblob(TempFolder),
|
|
Gcs(TempFolder),
|
|
}
|
|
|
|
pub struct TestGuard {
|
|
pub home_guard: FileDirGuard,
|
|
pub storage_guards: Vec<StorageGuard>,
|
|
}
|
|
|
|
pub struct FileDirGuard {
|
|
pub temp_dir: TempDir,
|
|
}
|
|
|
|
impl FileDirGuard {
|
|
pub fn new(temp_dir: TempDir) -> Self {
|
|
Self { temp_dir }
|
|
}
|
|
}
|
|
|
|
pub struct StorageGuard(pub TempDirGuard);
|
|
|
|
impl TestGuard {
|
|
pub async fn remove_all(&mut self) {
|
|
for storage_guard in self.storage_guards.iter_mut() {
|
|
if let TempDirGuard::S3(guard)
|
|
| TempDirGuard::Oss(guard)
|
|
| TempDirGuard::Azblob(guard)
|
|
| TempDirGuard::Gcs(guard) = &mut storage_guard.0
|
|
{
|
|
guard.remove_all().await.unwrap()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Drop for TestGuard {
|
|
fn drop(&mut self) {
|
|
let (tx, rx) = std::sync::mpsc::channel();
|
|
|
|
let guards = std::mem::take(&mut self.storage_guards);
|
|
common_runtime::spawn_global(async move {
|
|
let mut errors = vec![];
|
|
for guard in guards {
|
|
if let TempDirGuard::S3(guard)
|
|
| TempDirGuard::Oss(guard)
|
|
| TempDirGuard::Azblob(guard)
|
|
| TempDirGuard::Gcs(guard) = guard.0
|
|
&& let Err(e) = guard.remove_all().await
|
|
{
|
|
errors.push(e);
|
|
}
|
|
}
|
|
if errors.is_empty() {
|
|
tx.send(Ok(())).unwrap();
|
|
} else {
|
|
tx.send(Err(errors)).unwrap();
|
|
}
|
|
});
|
|
rx.recv().unwrap().unwrap_or_else(|e| panic!("{:?}", e));
|
|
}
|
|
}
|
|
|
|
pub fn create_tmp_dir_and_datanode_opts(
|
|
default_store_type: StorageType,
|
|
store_provider_types: Vec<StorageType>,
|
|
name: &str,
|
|
wal_config: DatanodeWalConfig,
|
|
gc_config: GcConfig,
|
|
) -> (DatanodeOptions, TestGuard) {
|
|
let home_tmp_dir = create_temp_dir(&format!("gt_data_{name}"));
|
|
let home_dir = home_tmp_dir.path().to_str().unwrap().to_string();
|
|
|
|
// Excludes the default object store.
|
|
let mut store_providers = Vec::with_capacity(store_provider_types.len());
|
|
// Includes the default object store.
|
|
let mut storage_guards = Vec::with_capacity(store_provider_types.len() + 1);
|
|
|
|
let (default_store, data_tmp_dir) = get_test_store_config(&default_store_type);
|
|
storage_guards.push(StorageGuard(data_tmp_dir));
|
|
|
|
for store_type in store_provider_types {
|
|
let (store, data_tmp_dir) = get_test_store_config(&store_type);
|
|
store_providers.push(store);
|
|
storage_guards.push(StorageGuard(data_tmp_dir))
|
|
}
|
|
let opts = create_datanode_opts(
|
|
default_store,
|
|
store_providers,
|
|
home_dir,
|
|
wal_config,
|
|
gc_config,
|
|
);
|
|
|
|
(
|
|
opts,
|
|
TestGuard {
|
|
home_guard: FileDirGuard::new(home_tmp_dir),
|
|
storage_guards,
|
|
},
|
|
)
|
|
}
|
|
|
|
pub(crate) fn create_datanode_opts(
|
|
default_store: ObjectStoreConfig,
|
|
providers: Vec<ObjectStoreConfig>,
|
|
home_dir: String,
|
|
wal_config: DatanodeWalConfig,
|
|
gc_config: GcConfig,
|
|
) -> DatanodeOptions {
|
|
let region_engine = DatanodeOptions::default()
|
|
.region_engine
|
|
.into_iter()
|
|
.map(|mut v| {
|
|
if let datanode::config::RegionEngineConfig::Mito(mito_config) = &mut v {
|
|
mito_config.gc = gc_config.clone();
|
|
}
|
|
v
|
|
})
|
|
.collect();
|
|
DatanodeOptions {
|
|
node_id: Some(0),
|
|
require_lease_before_startup: true,
|
|
storage: StorageConfig {
|
|
data_home: home_dir,
|
|
providers,
|
|
store: default_store,
|
|
},
|
|
grpc: GrpcOptions::default()
|
|
.with_bind_addr(PEER_PLACEHOLDER_ADDR)
|
|
.with_server_addr(PEER_PLACEHOLDER_ADDR),
|
|
wal: wal_config,
|
|
region_engine,
|
|
..Default::default()
|
|
}
|
|
}
|
|
|
|
pub(crate) async fn create_test_table(instance: &Instance, table_name: &str) {
|
|
let sql = format!(
|
|
r#"
|
|
CREATE TABLE IF NOT EXISTS {table_name} (
|
|
host String NOT NULL PRIMARY KEY,
|
|
cpu DOUBLE NULL,
|
|
memory DOUBLE NULL,
|
|
ts TIMESTAMP NOT NULL TIME INDEX,
|
|
)
|
|
"#
|
|
);
|
|
|
|
let result = instance.do_query(&sql, QueryContext::arc()).await;
|
|
let _ = result.first().unwrap().as_ref().unwrap();
|
|
}
|
|
|
|
async fn setup_standalone_instance(
|
|
test_name: &str,
|
|
store_type: StorageType,
|
|
) -> GreptimeDbStandalone {
|
|
GreptimeDbStandaloneBuilder::new(test_name)
|
|
.with_default_store_type(store_type)
|
|
.build()
|
|
.await
|
|
}
|
|
|
|
async fn setup_standalone_instance_with_slow_query_threshold(
|
|
test_name: &str,
|
|
store_type: StorageType,
|
|
slow_query_threshold: std::time::Duration,
|
|
) -> GreptimeDbStandalone {
|
|
GreptimeDbStandaloneBuilder::new(test_name)
|
|
.with_default_store_type(store_type)
|
|
.with_slow_query_threshold(slow_query_threshold)
|
|
.build()
|
|
.await
|
|
}
|
|
|
|
async fn setup_standalone_instance_with_plugins(
|
|
test_name: &str,
|
|
store_type: StorageType,
|
|
plugins: Plugins,
|
|
) -> GreptimeDbStandalone {
|
|
GreptimeDbStandaloneBuilder::new(test_name)
|
|
.with_default_store_type(store_type)
|
|
.with_plugin(plugins)
|
|
.build()
|
|
.await
|
|
}
|
|
|
|
async fn setup_standalone_instance_with_plugins_and_slow_query_threshold(
|
|
test_name: &str,
|
|
store_type: StorageType,
|
|
plugins: Plugins,
|
|
slow_query_threshold: std::time::Duration,
|
|
) -> GreptimeDbStandalone {
|
|
GreptimeDbStandaloneBuilder::new(test_name)
|
|
.with_default_store_type(store_type)
|
|
.with_plugin(plugins)
|
|
.with_slow_query_threshold(slow_query_threshold)
|
|
.build()
|
|
.await
|
|
}
|
|
|
|
pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router, TestGuard) {
|
|
let instance = setup_standalone_instance(name, store_type).await;
|
|
|
|
let http_opts = HttpOptions {
|
|
addr: format!("127.0.0.1:{}", ports::get_port()),
|
|
..Default::default()
|
|
};
|
|
let http_server = HttpServerBuilder::new(http_opts)
|
|
.with_sql_handler(instance.fe_instance().clone())
|
|
.with_logs_handler(instance.fe_instance().clone())
|
|
.with_metrics_handler(MetricsHandler)
|
|
.with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap())
|
|
.build();
|
|
(
|
|
http_server.build(http_server.make_app()).unwrap(),
|
|
instance.guard,
|
|
)
|
|
}
|
|
|
|
pub async fn setup_test_http_app_with_frontend(
|
|
store_type: StorageType,
|
|
name: &str,
|
|
) -> (Router, TestGuard) {
|
|
setup_test_http_app_with_frontend_and_user_provider(store_type, name, None).await
|
|
}
|
|
|
|
pub async fn setup_test_http_app_with_frontend_and_slow_query_threshold(
|
|
store_type: StorageType,
|
|
name: &str,
|
|
slow_query_threshold: std::time::Duration,
|
|
) -> (Router, TestGuard) {
|
|
let instance =
|
|
setup_standalone_instance_with_slow_query_threshold(name, store_type, slow_query_threshold)
|
|
.await;
|
|
|
|
create_test_table(instance.fe_instance(), "demo").await;
|
|
|
|
let http_opts = HttpOptions {
|
|
addr: format!("127.0.0.1:{}", ports::get_port()),
|
|
..Default::default()
|
|
};
|
|
|
|
let http_server = HttpServerBuilder::new(http_opts)
|
|
.with_sql_handler(instance.fe_instance().clone())
|
|
.with_log_ingest_handler(instance.fe_instance().clone(), None, None)
|
|
.with_logs_handler(instance.fe_instance().clone())
|
|
.with_influxdb_handler(instance.fe_instance().clone())
|
|
.with_otlp_handler(instance.fe_instance().clone(), true)
|
|
.with_jaeger_handler(instance.fe_instance().clone())
|
|
.with_greptime_config_options(instance.opts.to_toml().unwrap())
|
|
.build();
|
|
|
|
let app = http_server.build(http_server.make_app()).unwrap();
|
|
(app, instance.guard)
|
|
}
|
|
|
|
pub async fn setup_test_http_app_with_frontend_and_user_provider(
|
|
store_type: StorageType,
|
|
name: &str,
|
|
user_provider: Option<UserProviderRef>,
|
|
) -> (Router, TestGuard) {
|
|
setup_test_http_app_with_frontend_and_custom_options(
|
|
store_type,
|
|
name,
|
|
user_provider,
|
|
None,
|
|
None,
|
|
)
|
|
.await
|
|
}
|
|
|
|
pub async fn setup_test_http_app_with_frontend_and_custom_options(
|
|
store_type: StorageType,
|
|
name: &str,
|
|
user_provider: Option<UserProviderRef>,
|
|
http_opts: Option<HttpOptions>,
|
|
memory_limiter: Option<ServerMemoryLimiter>,
|
|
) -> (Router, TestGuard) {
|
|
let plugins = Plugins::new();
|
|
if let Some(user_provider) = user_provider.clone() {
|
|
plugins.insert::<UserProviderRef>(user_provider.clone());
|
|
plugins.insert::<PermissionCheckerRef>(DefaultPermissionChecker::arc());
|
|
}
|
|
|
|
let instance = setup_standalone_instance_with_plugins(name, store_type, plugins).await;
|
|
|
|
create_test_table(instance.fe_instance(), "demo").await;
|
|
|
|
let http_opts = http_opts.unwrap_or_else(|| HttpOptions {
|
|
addr: format!("127.0.0.1:{}", ports::get_port()),
|
|
..Default::default()
|
|
});
|
|
|
|
let mut http_server = HttpServerBuilder::new(http_opts)
|
|
.with_sql_handler(instance.fe_instance().clone())
|
|
.with_log_ingest_handler(instance.fe_instance().clone(), None, None)
|
|
.with_logs_handler(instance.fe_instance().clone())
|
|
.with_influxdb_handler(instance.fe_instance().clone())
|
|
.with_otlp_handler(instance.fe_instance().clone(), true)
|
|
.with_jaeger_handler(instance.fe_instance().clone())
|
|
.with_dashboard_handler(instance.fe_instance().clone())
|
|
.with_greptime_config_options(instance.opts.to_toml().unwrap());
|
|
|
|
if let Some(user_provider) = user_provider {
|
|
http_server = http_server.with_user_provider(user_provider);
|
|
}
|
|
|
|
if let Some(limiter) = memory_limiter {
|
|
http_server = http_server.with_memory_limiter(limiter);
|
|
}
|
|
|
|
let http_server = http_server.build();
|
|
|
|
let app = http_server.build(http_server.make_app()).unwrap();
|
|
(app, instance.guard)
|
|
}
|
|
|
|
async fn run_sql(sql: &str, instance: &GreptimeDbStandalone) {
|
|
let result = instance
|
|
.fe_instance()
|
|
.do_query(sql, QueryContext::arc())
|
|
.await;
|
|
let _ = result.first().unwrap().as_ref().unwrap();
|
|
}
|
|
|
|
pub async fn setup_test_prom_app_with_frontend(
|
|
store_type: StorageType,
|
|
name: &str,
|
|
) -> (Router, TestGuard) {
|
|
unsafe {
|
|
std::env::set_var("TZ", "UTC");
|
|
}
|
|
|
|
let instance = setup_standalone_instance(name, store_type).await;
|
|
|
|
// build physical table
|
|
let sql = "CREATE TABLE phy (ts timestamp time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')";
|
|
run_sql(sql, &instance).await;
|
|
let sql = "CREATE TABLE phy_ns (ts timestamp(0) time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')";
|
|
run_sql(sql, &instance).await;
|
|
// build metric tables
|
|
let sql = "CREATE TABLE demo (ts timestamp time index, val double, host string primary key) engine=metric with ('on_physical_table' = 'phy')";
|
|
run_sql(sql, &instance).await;
|
|
let sql = "CREATE TABLE demo_metrics (ts timestamp time index, val double, idc string primary key) engine=metric with ('on_physical_table' = 'phy')";
|
|
run_sql(sql, &instance).await;
|
|
let sql = "CREATE TABLE multi_labels (ts timestamp(0) time index, val double, idc string, env string, host string, primary key (idc, env, host)) engine=metric with ('on_physical_table' = 'phy_ns')";
|
|
run_sql(sql, &instance).await;
|
|
|
|
// insert rows
|
|
let sql = "INSERT INTO demo(host, val, ts) VALUES ('host1', 1.1, 0), ('host2', 2.1, 600000)";
|
|
run_sql(sql, &instance).await;
|
|
let sql =
|
|
"INSERT INTO demo_metrics(idc, val, ts) VALUES ('idc1', 1.1, 0), ('idc2', 2.1, 600000)";
|
|
run_sql(sql, &instance).await;
|
|
// insert a row with empty label
|
|
let sql = "INSERT INTO demo_metrics(val, ts) VALUES (1.1, 0)";
|
|
run_sql(sql, &instance).await;
|
|
|
|
// insert rows to multi_labels
|
|
let sql = "INSERT INTO multi_labels(idc, env, host, val, ts) VALUES ('idc1', 'dev', 'host1', 1.1, 0), ('idc1', 'dev', 'host2', 2.1, 0), ('idc2', 'dev', 'host1', 1.1, 0), ('idc2', 'test', 'host3', 2.1, 0)";
|
|
run_sql(sql, &instance).await;
|
|
|
|
// build physical table
|
|
let sql = "CREATE TABLE phy2 (ts timestamp(9) time index, val double, host string primary key) engine=metric with ('physical_metric_table' = '')";
|
|
run_sql(sql, &instance).await;
|
|
let sql = "CREATE TABLE demo_metrics_with_nanos(ts timestamp(9) time index, val double, idc string primary key) engine=metric with ('on_physical_table' = 'phy2')";
|
|
run_sql(sql, &instance).await;
|
|
let sql = "INSERT INTO demo_metrics_with_nanos(idc, val, ts) VALUES ('idc1', 1.1, 0)";
|
|
run_sql(sql, &instance).await;
|
|
|
|
// a mito table with non-prometheus compatible values
|
|
let sql = "CREATE TABLE mito (ts timestamp(9) time index, val double, host bigint primary key) engine=mito";
|
|
run_sql(sql, &instance).await;
|
|
let sql = "INSERT INTO mito(host, val, ts) VALUES (1, 1.1, 0)";
|
|
run_sql(sql, &instance).await;
|
|
|
|
let http_opts = HttpOptions {
|
|
addr: format!("127.0.0.1:{}", ports::get_port()),
|
|
..Default::default()
|
|
};
|
|
let frontend_ref = instance.fe_instance().clone();
|
|
let http_server = HttpServerBuilder::new(http_opts)
|
|
.with_sql_handler(frontend_ref.clone())
|
|
.with_logs_handler(instance.fe_instance().clone())
|
|
.with_prom_handler(
|
|
frontend_ref.clone(),
|
|
Some(frontend_ref.clone()),
|
|
true,
|
|
PromValidationMode::Strict,
|
|
None,
|
|
)
|
|
.with_prometheus_handler(frontend_ref)
|
|
.with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap())
|
|
.build();
|
|
let app = http_server.build(http_server.make_app()).unwrap();
|
|
(app, instance.guard)
|
|
}
|
|
|
|
pub async fn setup_grpc_server(
|
|
store_type: StorageType,
|
|
name: &str,
|
|
) -> (GreptimeDbStandalone, Arc<GrpcServer>) {
|
|
setup_grpc_server_with(store_type, name, None, None, None).await
|
|
}
|
|
|
|
pub async fn setup_grpc_server_with_user_provider(
|
|
store_type: StorageType,
|
|
name: &str,
|
|
user_provider: Option<UserProviderRef>,
|
|
) -> (GreptimeDbStandalone, Arc<GrpcServer>) {
|
|
setup_grpc_server_with(store_type, name, user_provider, None, None).await
|
|
}
|
|
|
|
pub async fn setup_grpc_server_with(
|
|
store_type: StorageType,
|
|
name: &str,
|
|
user_provider: Option<UserProviderRef>,
|
|
grpc_config: Option<GrpcServerConfig>,
|
|
memory_limiter: Option<servers::request_memory_limiter::ServerMemoryLimiter>,
|
|
) -> (GreptimeDbStandalone, Arc<GrpcServer>) {
|
|
let instance = setup_standalone_instance(name, store_type).await;
|
|
|
|
let runtime: Runtime = RuntimeBuilder::default()
|
|
.worker_threads(2)
|
|
.thread_name("grpc-handlers")
|
|
.build()
|
|
.unwrap();
|
|
|
|
let fe_instance_ref = instance.fe_instance().clone();
|
|
|
|
let greptime_request_handler = GreptimeRequestHandler::new(
|
|
fe_instance_ref.clone(),
|
|
user_provider.clone(),
|
|
Some(runtime.clone()),
|
|
FlightCompression::default(),
|
|
);
|
|
|
|
let flight_handler = Arc::new(greptime_request_handler.clone());
|
|
|
|
let grpc_config = grpc_config.unwrap_or_default();
|
|
let mut grpc_builder = GrpcServerBuilder::new(grpc_config.clone(), runtime);
|
|
|
|
if let Some(limiter) = memory_limiter {
|
|
grpc_builder = grpc_builder.with_memory_limiter(limiter);
|
|
}
|
|
|
|
let grpc_builder = grpc_builder
|
|
.database_handler(greptime_request_handler)
|
|
.flight_handler(flight_handler)
|
|
.prometheus_handler(fe_instance_ref.clone(), user_provider.clone())
|
|
.otel_arrow_handler(OtelArrowServiceHandler::new(fe_instance_ref, user_provider))
|
|
.with_tls_config(grpc_config.tls)
|
|
.unwrap();
|
|
|
|
let mut grpc_server = grpc_builder.build();
|
|
|
|
let fe_grpc_addr = "127.0.0.1:0".parse::<SocketAddr>().unwrap();
|
|
grpc_server.start(fe_grpc_addr).await.unwrap();
|
|
|
|
(instance, Arc::new(grpc_server))
|
|
}
|
|
|
|
pub async fn setup_mysql_server(
|
|
store_type: StorageType,
|
|
name: &str,
|
|
) -> (TestGuard, Arc<Box<dyn Server>>) {
|
|
setup_mysql_server_with_user_provider(store_type, name, None).await
|
|
}
|
|
|
|
pub async fn setup_mysql_server_with_slow_query_threshold(
|
|
store_type: StorageType,
|
|
name: &str,
|
|
slow_query_threshold: std::time::Duration,
|
|
) -> (TestGuard, Arc<Box<dyn Server>>) {
|
|
let plugins = Plugins::new();
|
|
let instance = setup_standalone_instance_with_plugins_and_slow_query_threshold(
|
|
name,
|
|
store_type,
|
|
plugins,
|
|
slow_query_threshold,
|
|
)
|
|
.await;
|
|
|
|
let runtime = RuntimeBuilder::default()
|
|
.worker_threads(2)
|
|
.thread_name("mysql-runtime")
|
|
.build()
|
|
.unwrap();
|
|
|
|
let fe_mysql_addr = format!("127.0.0.1:{}", ports::get_port());
|
|
|
|
let fe_instance_ref = instance.fe_instance().clone();
|
|
let opts = MysqlOptions {
|
|
addr: fe_mysql_addr.clone(),
|
|
..Default::default()
|
|
};
|
|
let mut mysql_server = MysqlServer::create_server(
|
|
runtime,
|
|
Arc::new(MysqlSpawnRef::new(fe_instance_ref, None)),
|
|
Arc::new(MysqlSpawnConfig::new(
|
|
false,
|
|
Arc::new(
|
|
ReloadableTlsServerConfig::try_new(opts.tls.clone())
|
|
.expect("Failed to load certificates and keys"),
|
|
),
|
|
0,
|
|
opts.reject_no_database.unwrap_or(false),
|
|
opts.prepared_stmt_cache_size,
|
|
)),
|
|
None,
|
|
);
|
|
|
|
mysql_server
|
|
.start(fe_mysql_addr.parse::<SocketAddr>().unwrap())
|
|
.await
|
|
.unwrap();
|
|
|
|
(instance.guard, Arc::new(mysql_server))
|
|
}
|
|
|
|
pub async fn setup_mysql_server_with_user_provider(
|
|
store_type: StorageType,
|
|
name: &str,
|
|
user_provider: Option<UserProviderRef>,
|
|
) -> (TestGuard, Arc<Box<dyn Server>>) {
|
|
let plugins = Plugins::new();
|
|
if let Some(user_provider) = user_provider.clone() {
|
|
plugins.insert::<UserProviderRef>(user_provider.clone());
|
|
plugins.insert::<PermissionCheckerRef>(DefaultPermissionChecker::arc());
|
|
}
|
|
|
|
let instance = setup_standalone_instance_with_plugins(name, store_type, plugins).await;
|
|
|
|
let runtime = RuntimeBuilder::default()
|
|
.worker_threads(2)
|
|
.thread_name("mysql-runtime")
|
|
.build()
|
|
.unwrap();
|
|
|
|
let fe_mysql_addr = format!("127.0.0.1:{}", ports::get_port());
|
|
|
|
let fe_instance_ref = instance.fe_instance().clone();
|
|
let opts = MysqlOptions {
|
|
addr: fe_mysql_addr.clone(),
|
|
..Default::default()
|
|
};
|
|
let mut mysql_server = MysqlServer::create_server(
|
|
runtime,
|
|
Arc::new(MysqlSpawnRef::new(fe_instance_ref, user_provider)),
|
|
Arc::new(MysqlSpawnConfig::new(
|
|
false,
|
|
Arc::new(
|
|
ReloadableTlsServerConfig::try_new(opts.tls.clone())
|
|
.expect("Failed to load certificates and keys"),
|
|
),
|
|
0,
|
|
opts.reject_no_database.unwrap_or(false),
|
|
opts.prepared_stmt_cache_size,
|
|
)),
|
|
None,
|
|
);
|
|
|
|
mysql_server
|
|
.start(fe_mysql_addr.parse::<SocketAddr>().unwrap())
|
|
.await
|
|
.unwrap();
|
|
|
|
(instance.guard, Arc::new(mysql_server))
|
|
}
|
|
|
|
pub async fn setup_pg_server(
|
|
store_type: StorageType,
|
|
name: &str,
|
|
) -> (TestGuard, Arc<Box<dyn Server>>) {
|
|
setup_pg_server_with_user_provider(store_type, name, None).await
|
|
}
|
|
|
|
pub async fn setup_pg_server_with_slow_query_threshold(
|
|
store_type: StorageType,
|
|
name: &str,
|
|
slow_query_threshold: std::time::Duration,
|
|
) -> (TestGuard, Arc<Box<dyn Server>>) {
|
|
let instance =
|
|
setup_standalone_instance_with_slow_query_threshold(name, store_type, slow_query_threshold)
|
|
.await;
|
|
|
|
let runtime = RuntimeBuilder::default()
|
|
.worker_threads(2)
|
|
.thread_name("pg-runtime")
|
|
.build()
|
|
.unwrap();
|
|
|
|
let fe_pg_addr = format!("127.0.0.1:{}", ports::get_port());
|
|
|
|
let fe_instance_ref = instance.fe_instance().clone();
|
|
let opts = PostgresOptions {
|
|
addr: fe_pg_addr.clone(),
|
|
..Default::default()
|
|
};
|
|
let tls_server_config = Arc::new(
|
|
ReloadableTlsServerConfig::try_new(opts.tls.clone())
|
|
.expect("Failed to load certificates and keys"),
|
|
);
|
|
|
|
let mut pg_server = Box::new(PostgresServer::new(
|
|
fe_instance_ref,
|
|
opts.tls.should_force_tls(),
|
|
tls_server_config,
|
|
0,
|
|
runtime,
|
|
None,
|
|
None,
|
|
));
|
|
|
|
pg_server
|
|
.start(fe_pg_addr.parse::<SocketAddr>().unwrap())
|
|
.await
|
|
.unwrap();
|
|
|
|
(instance.guard, Arc::new(pg_server))
|
|
}
|
|
|
|
pub async fn setup_pg_server_with_user_provider(
|
|
store_type: StorageType,
|
|
name: &str,
|
|
user_provider: Option<UserProviderRef>,
|
|
) -> (TestGuard, Arc<Box<dyn Server>>) {
|
|
let instance = setup_standalone_instance(name, store_type).await;
|
|
|
|
let runtime = RuntimeBuilder::default()
|
|
.worker_threads(2)
|
|
.thread_name("pg-runtime")
|
|
.build()
|
|
.unwrap();
|
|
|
|
let fe_pg_addr = format!("127.0.0.1:{}", ports::get_port());
|
|
|
|
let fe_instance_ref = instance.fe_instance().clone();
|
|
let opts = PostgresOptions {
|
|
addr: fe_pg_addr.clone(),
|
|
..Default::default()
|
|
};
|
|
let tls_server_config = Arc::new(
|
|
ReloadableTlsServerConfig::try_new(opts.tls.clone())
|
|
.expect("Failed to load certificates and keys"),
|
|
);
|
|
|
|
let mut pg_server = Box::new(PostgresServer::new(
|
|
fe_instance_ref,
|
|
opts.tls.should_force_tls(),
|
|
tls_server_config,
|
|
0,
|
|
runtime,
|
|
user_provider,
|
|
None,
|
|
));
|
|
|
|
pg_server
|
|
.start(fe_pg_addr.parse::<SocketAddr>().unwrap())
|
|
.await
|
|
.unwrap();
|
|
|
|
(instance.guard, Arc::new(pg_server))
|
|
}
|
|
|
|
pub(crate) async fn prepare_another_catalog_and_schema(instance: &Instance) {
|
|
let catalog_manager = instance
|
|
.catalog_manager()
|
|
.as_any()
|
|
.downcast_ref::<KvBackendCatalogManager>()
|
|
.unwrap();
|
|
|
|
let table_metadata_manager = catalog_manager.table_metadata_manager_ref();
|
|
table_metadata_manager
|
|
.catalog_manager()
|
|
.create(CatalogNameKey::new("another_catalog"), true)
|
|
.await
|
|
.unwrap();
|
|
table_metadata_manager
|
|
.schema_manager()
|
|
.create(
|
|
SchemaNameKey::new("another_catalog", "another_schema"),
|
|
None,
|
|
true,
|
|
)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
pub async fn execute_sql(instance: &Arc<Instance>, sql: &str) -> Output {
|
|
SqlQueryHandler::do_query(instance.as_ref(), sql, QueryContext::arc())
|
|
.await
|
|
.remove(0)
|
|
.unwrap()
|
|
}
|
|
|
|
pub async fn try_execute_sql(
|
|
instance: &Arc<Instance>,
|
|
sql: &str,
|
|
) -> servers::error::Result<Output> {
|
|
SqlQueryHandler::do_query(instance.as_ref(), sql, QueryContext::arc())
|
|
.await
|
|
.remove(0)
|
|
}
|
|
|
|
pub async fn execute_sql_and_expect(instance: &Arc<Instance>, sql: &str, expected: &str) {
|
|
let output = execute_sql(instance, sql).await;
|
|
let output = output.data.pretty_print().await;
|
|
assert_eq!(output, expected.trim());
|
|
}
|