mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-08 22:32:55 +00:00
Compare commits
5 Commits
v0.5.0-nig
...
bench_flow
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5ce678fbf4 | ||
|
|
2e101bad53 | ||
|
|
7d4009e51f | ||
|
|
1a1c093ad7 | ||
|
|
0badb3715e |
@@ -134,7 +134,7 @@ worker_request_batch_size = 64
|
||||
# Number of meta action updated to trigger a new checkpoint for the manifest
|
||||
manifest_checkpoint_distance = 10
|
||||
# Manifest compression type
|
||||
manifest_compress_type = "Uncompressed"
|
||||
manifest_compress_type = "uncompressed"
|
||||
# Max number of running background jobs
|
||||
max_background_jobs = 4
|
||||
# Interval to auto flush a region if it has not flushed yet.
|
||||
@@ -162,3 +162,5 @@ sst_write_buffer_size = "8MB"
|
||||
# enable_otlp_tracing = false
|
||||
# tracing exporter endpoint with format `ip:port`, we use grpc oltp as exporter, default endpoint is `localhost:4317`
|
||||
# otlp_endpoint = "localhost:4317"
|
||||
# The percentage of tracing will be sampled and exported. Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1. ratio > 1 are treated as 1. Fractions < 0 are treated as 0
|
||||
# tracing_sample_ratio = 1.0
|
||||
|
||||
@@ -20,6 +20,7 @@ use once_cell::sync::Lazy;
|
||||
use opentelemetry::{global, KeyValue};
|
||||
use opentelemetry_otlp::WithExportConfig;
|
||||
use opentelemetry_sdk::propagation::TraceContextPropagator;
|
||||
use opentelemetry_sdk::trace::Sampler;
|
||||
use opentelemetry_semantic_conventions::resource;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
@@ -34,15 +35,28 @@ pub use crate::{debug, error, info, trace, warn};
|
||||
|
||||
const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct LoggingOptions {
|
||||
pub dir: String,
|
||||
pub level: Option<String>,
|
||||
pub enable_otlp_tracing: bool,
|
||||
pub otlp_endpoint: Option<String>,
|
||||
pub tracing_sample_ratio: Option<f64>,
|
||||
}
|
||||
|
||||
impl PartialEq for LoggingOptions {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.dir == other.dir
|
||||
&& self.level == other.level
|
||||
&& self.enable_otlp_tracing == other.enable_otlp_tracing
|
||||
&& self.otlp_endpoint == other.otlp_endpoint
|
||||
&& self.tracing_sample_ratio == other.tracing_sample_ratio
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for LoggingOptions {}
|
||||
|
||||
impl Default for LoggingOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
@@ -50,6 +64,7 @@ impl Default for LoggingOptions {
|
||||
level: None,
|
||||
enable_otlp_tracing: false,
|
||||
otlp_endpoint: None,
|
||||
tracing_sample_ratio: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -145,7 +160,10 @@ pub fn init_global_logging(
|
||||
let filter = targets_string
|
||||
.parse::<filter::Targets>()
|
||||
.expect("error parsing log level string");
|
||||
|
||||
let sampler = opts
|
||||
.tracing_sample_ratio
|
||||
.map(Sampler::TraceIdRatioBased)
|
||||
.unwrap_or(Sampler::AlwaysOn);
|
||||
// Must enable 'tokio_unstable' cfg to use this feature.
|
||||
// For example: `RUSTFLAGS="--cfg tokio_unstable" cargo run -F common-telemetry/console -- standalone start`
|
||||
#[cfg(feature = "tokio-console")]
|
||||
@@ -200,17 +218,19 @@ pub fn init_global_logging(
|
||||
.unwrap_or(DEFAULT_OTLP_ENDPOINT.to_string()),
|
||||
),
|
||||
)
|
||||
.with_trace_config(opentelemetry_sdk::trace::config().with_resource(
|
||||
opentelemetry_sdk::Resource::new(vec![
|
||||
KeyValue::new(resource::SERVICE_NAME, app_name.to_string()),
|
||||
KeyValue::new(
|
||||
resource::SERVICE_INSTANCE_ID,
|
||||
node_id.unwrap_or("none".to_string()),
|
||||
),
|
||||
KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
|
||||
KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
|
||||
]),
|
||||
))
|
||||
.with_trace_config(
|
||||
opentelemetry_sdk::trace::config()
|
||||
.with_sampler(sampler)
|
||||
.with_resource(opentelemetry_sdk::Resource::new(vec![
|
||||
KeyValue::new(resource::SERVICE_NAME, app_name.to_string()),
|
||||
KeyValue::new(
|
||||
resource::SERVICE_INSTANCE_ID,
|
||||
node_id.unwrap_or("none".to_string()),
|
||||
),
|
||||
KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
|
||||
KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
|
||||
])),
|
||||
)
|
||||
.install_batch(opentelemetry_sdk::runtime::Tokio)
|
||||
.expect("otlp tracer install failed");
|
||||
let tracing_layer = Some(tracing_opentelemetry::layer().with_tracer(tracer));
|
||||
|
||||
@@ -44,6 +44,7 @@ pub struct FrontendOptions {
|
||||
pub logging: LoggingOptions,
|
||||
pub datanode: DatanodeOptions,
|
||||
pub user_provider: Option<String>,
|
||||
pub flow_grpc_addr: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for FrontendOptions {
|
||||
@@ -64,6 +65,7 @@ impl Default for FrontendOptions {
|
||||
logging: LoggingOptions::default(),
|
||||
datanode: DatanodeOptions::default(),
|
||||
user_provider: None,
|
||||
flow_grpc_addr: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ pub mod standalone;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::greptime_database_client::GreptimeDatabaseClient;
|
||||
use api::v1::meta::Role;
|
||||
use async_trait::async_trait;
|
||||
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
|
||||
@@ -82,6 +83,8 @@ use sql::statements::copy::CopyTable;
|
||||
use sql::statements::statement::Statement;
|
||||
use sqlparser::ast::ObjectName;
|
||||
pub use standalone::StandaloneDatanodeManager;
|
||||
use tokio::sync::Mutex;
|
||||
use tonic::transport::Channel;
|
||||
|
||||
use self::region_query::FrontendRegionQueryHandler;
|
||||
use self::standalone::StandaloneTableMetadataCreator;
|
||||
@@ -117,6 +120,12 @@ pub trait FrontendInstance:
|
||||
pub type FrontendInstanceRef = Arc<dyn FrontendInstance>;
|
||||
pub type StatementExecutorRef = Arc<StatementExecutor>;
|
||||
|
||||
/// Send certain query to flow worker through grpc
|
||||
pub struct FlowProxy {
|
||||
flow_client: Mutex<GreptimeDatabaseClient<Channel>>,
|
||||
}
|
||||
pub type FlowProxyRef = Arc<FlowProxy>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Instance {
|
||||
catalog_manager: CatalogManagerRef,
|
||||
@@ -128,6 +137,7 @@ pub struct Instance {
|
||||
heartbeat_task: Option<HeartbeatTask>,
|
||||
inserter: InserterRef,
|
||||
deleter: DeleterRef,
|
||||
flow: Option<FlowProxyRef>,
|
||||
}
|
||||
|
||||
impl Instance {
|
||||
@@ -219,6 +229,7 @@ impl Instance {
|
||||
heartbeat_task,
|
||||
inserter,
|
||||
deleter,
|
||||
flow: None,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -345,6 +356,11 @@ impl Instance {
|
||||
inserter.clone(),
|
||||
));
|
||||
|
||||
let addr = std::env::var("FLOW_ADDR").unwrap_or("http://[::1]:14514".to_string());
|
||||
let conn = tonic::transport::Endpoint::new(addr)
|
||||
.unwrap()
|
||||
.connect_lazy();
|
||||
let client = GreptimeDatabaseClient::new(conn);
|
||||
Ok(Instance {
|
||||
catalog_manager: catalog_manager.clone(),
|
||||
script_executor,
|
||||
@@ -355,6 +371,12 @@ impl Instance {
|
||||
heartbeat_task: None,
|
||||
inserter,
|
||||
deleter,
|
||||
flow: Some(
|
||||
FlowProxy {
|
||||
flow_client: Mutex::new(client),
|
||||
}
|
||||
.into(),
|
||||
),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -15,11 +15,14 @@
|
||||
use api::v1::ddl_request::{Expr as DdlExpr, Expr};
|
||||
use api::v1::greptime_request::Request;
|
||||
use api::v1::query_request::Query;
|
||||
use api::v1::{DeleteRequests, InsertRequests, RowDeleteRequests, RowInsertRequests};
|
||||
use api::v1::{
|
||||
DeleteRequests, GreptimeRequest, InsertRequests, RowDeleteRequests, RowInsertRequests,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
|
||||
use common_meta::table_name::TableName;
|
||||
use common_query::Output;
|
||||
use common_telemetry::info;
|
||||
use query::parser::PromQuery;
|
||||
use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
|
||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
@@ -47,6 +50,27 @@ impl GrpcQueryHandler for Instance {
|
||||
.as_ref()
|
||||
.check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
|
||||
.context(PermissionSnafu)?;
|
||||
/*
|
||||
// copy row inserts to flow worker
|
||||
if let Request::RowInserts(_) = &request {
|
||||
let full_req = GreptimeRequest {
|
||||
header: None,
|
||||
request: Some(request.clone()),
|
||||
};
|
||||
|
||||
if let Some(flow_proxy) = &self.flow {
|
||||
flow_proxy
|
||||
.flow_client
|
||||
.lock()
|
||||
.await
|
||||
.handle(full_req)
|
||||
.await
|
||||
.unwrap();
|
||||
} else {
|
||||
info!("flow proxy is not initialized");
|
||||
}
|
||||
};
|
||||
let output = Output::AffectedRows(0);*/
|
||||
|
||||
let output = match request {
|
||||
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
|
||||
@@ -187,10 +211,24 @@ impl Instance {
|
||||
requests: RowInsertRequests,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
let full_req = GreptimeRequest {
|
||||
header: None,
|
||||
request: Some(Request::RowInserts(requests.clone())),
|
||||
};
|
||||
|
||||
if let Some(flow_proxy) = &self.flow {
|
||||
let mut client = flow_proxy.flow_client.lock().await.clone();
|
||||
client.handle(full_req).await.unwrap();
|
||||
} else {
|
||||
info!("flow proxy is not initialized");
|
||||
}
|
||||
Ok(Output::AffectedRows(0))
|
||||
/*
|
||||
self.inserter
|
||||
.handle_row_inserts(requests, ctx, self.statement_executor.as_ref())
|
||||
.await
|
||||
.context(TableOperationSnafu)
|
||||
*/
|
||||
}
|
||||
|
||||
pub async fn handle_deletes(
|
||||
|
||||
Reference in New Issue
Block a user