Compare commits

...

5 Commits

Author SHA1 Message Date
Discord9
5ce678fbf4 clone client instead 2023-12-05 11:31:14 +08:00
Discord9
2e101bad53 feat: patch to resend insert to flow worker 2023-12-05 10:25:28 +08:00
Discord9
7d4009e51f dbg: comment out handle request for test 2023-11-30 15:02:40 +08:00
Discord9
1a1c093ad7 feat: resend req to flow 2023-11-30 14:08:58 +08:00
WU Jingdi
0badb3715e feat: support sample ratio in trace (#2809)
* feat: support sample ratio in trace

* chore: fix code advice
2023-11-27 06:46:46 +00:00
5 changed files with 99 additions and 15 deletions

View File

@@ -134,7 +134,7 @@ worker_request_batch_size = 64
# Number of meta action updated to trigger a new checkpoint for the manifest
manifest_checkpoint_distance = 10
# Manifest compression type
manifest_compress_type = "Uncompressed"
manifest_compress_type = "uncompressed"
# Max number of running background jobs
max_background_jobs = 4
# Interval to auto flush a region if it has not flushed yet.
@@ -162,3 +162,5 @@ sst_write_buffer_size = "8MB"
# enable_otlp_tracing = false
# tracing exporter endpoint with format `ip:port`, we use grpc oltp as exporter, default endpoint is `localhost:4317`
# otlp_endpoint = "localhost:4317"
# The percentage of tracing will be sampled and exported. Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1. ratio > 1 are treated as 1. Fractions < 0 are treated as 0
# tracing_sample_ratio = 1.0

View File

@@ -20,6 +20,7 @@ use once_cell::sync::Lazy;
use opentelemetry::{global, KeyValue};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::trace::Sampler;
use opentelemetry_semantic_conventions::resource;
use serde::{Deserialize, Serialize};
use tracing_appender::non_blocking::WorkerGuard;
@@ -34,15 +35,28 @@ pub use crate::{debug, error, info, trace, warn};
const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct LoggingOptions {
pub dir: String,
pub level: Option<String>,
pub enable_otlp_tracing: bool,
pub otlp_endpoint: Option<String>,
pub tracing_sample_ratio: Option<f64>,
}
impl PartialEq for LoggingOptions {
fn eq(&self, other: &Self) -> bool {
self.dir == other.dir
&& self.level == other.level
&& self.enable_otlp_tracing == other.enable_otlp_tracing
&& self.otlp_endpoint == other.otlp_endpoint
&& self.tracing_sample_ratio == other.tracing_sample_ratio
}
}
impl Eq for LoggingOptions {}
impl Default for LoggingOptions {
fn default() -> Self {
Self {
@@ -50,6 +64,7 @@ impl Default for LoggingOptions {
level: None,
enable_otlp_tracing: false,
otlp_endpoint: None,
tracing_sample_ratio: None,
}
}
}
@@ -145,7 +160,10 @@ pub fn init_global_logging(
let filter = targets_string
.parse::<filter::Targets>()
.expect("error parsing log level string");
let sampler = opts
.tracing_sample_ratio
.map(Sampler::TraceIdRatioBased)
.unwrap_or(Sampler::AlwaysOn);
// Must enable 'tokio_unstable' cfg to use this feature.
// For example: `RUSTFLAGS="--cfg tokio_unstable" cargo run -F common-telemetry/console -- standalone start`
#[cfg(feature = "tokio-console")]
@@ -200,17 +218,19 @@ pub fn init_global_logging(
.unwrap_or(DEFAULT_OTLP_ENDPOINT.to_string()),
),
)
.with_trace_config(opentelemetry_sdk::trace::config().with_resource(
opentelemetry_sdk::Resource::new(vec![
KeyValue::new(resource::SERVICE_NAME, app_name.to_string()),
KeyValue::new(
resource::SERVICE_INSTANCE_ID,
node_id.unwrap_or("none".to_string()),
),
KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
]),
))
.with_trace_config(
opentelemetry_sdk::trace::config()
.with_sampler(sampler)
.with_resource(opentelemetry_sdk::Resource::new(vec![
KeyValue::new(resource::SERVICE_NAME, app_name.to_string()),
KeyValue::new(
resource::SERVICE_INSTANCE_ID,
node_id.unwrap_or("none".to_string()),
),
KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()),
])),
)
.install_batch(opentelemetry_sdk::runtime::Tokio)
.expect("otlp tracer install failed");
let tracing_layer = Some(tracing_opentelemetry::layer().with_tracer(tracer));

View File

@@ -44,6 +44,7 @@ pub struct FrontendOptions {
pub logging: LoggingOptions,
pub datanode: DatanodeOptions,
pub user_provider: Option<String>,
pub flow_grpc_addr: Option<String>,
}
impl Default for FrontendOptions {
@@ -64,6 +65,7 @@ impl Default for FrontendOptions {
logging: LoggingOptions::default(),
datanode: DatanodeOptions::default(),
user_provider: None,
flow_grpc_addr: None,
}
}
}

View File

@@ -24,6 +24,7 @@ pub mod standalone;
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::greptime_database_client::GreptimeDatabaseClient;
use api::v1::meta::Role;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
@@ -82,6 +83,8 @@ use sql::statements::copy::CopyTable;
use sql::statements::statement::Statement;
use sqlparser::ast::ObjectName;
pub use standalone::StandaloneDatanodeManager;
use tokio::sync::Mutex;
use tonic::transport::Channel;
use self::region_query::FrontendRegionQueryHandler;
use self::standalone::StandaloneTableMetadataCreator;
@@ -117,6 +120,12 @@ pub trait FrontendInstance:
pub type FrontendInstanceRef = Arc<dyn FrontendInstance>;
pub type StatementExecutorRef = Arc<StatementExecutor>;
/// Send certain query to flow worker through grpc
pub struct FlowProxy {
flow_client: Mutex<GreptimeDatabaseClient<Channel>>,
}
pub type FlowProxyRef = Arc<FlowProxy>;
#[derive(Clone)]
pub struct Instance {
catalog_manager: CatalogManagerRef,
@@ -128,6 +137,7 @@ pub struct Instance {
heartbeat_task: Option<HeartbeatTask>,
inserter: InserterRef,
deleter: DeleterRef,
flow: Option<FlowProxyRef>,
}
impl Instance {
@@ -219,6 +229,7 @@ impl Instance {
heartbeat_task,
inserter,
deleter,
flow: None,
})
}
@@ -345,6 +356,11 @@ impl Instance {
inserter.clone(),
));
let addr = std::env::var("FLOW_ADDR").unwrap_or("http://[::1]:14514".to_string());
let conn = tonic::transport::Endpoint::new(addr)
.unwrap()
.connect_lazy();
let client = GreptimeDatabaseClient::new(conn);
Ok(Instance {
catalog_manager: catalog_manager.clone(),
script_executor,
@@ -355,6 +371,12 @@ impl Instance {
heartbeat_task: None,
inserter,
deleter,
flow: Some(
FlowProxy {
flow_client: Mutex::new(client),
}
.into(),
),
})
}

View File

@@ -15,11 +15,14 @@
use api::v1::ddl_request::{Expr as DdlExpr, Expr};
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{DeleteRequests, InsertRequests, RowDeleteRequests, RowInsertRequests};
use api::v1::{
DeleteRequests, GreptimeRequest, InsertRequests, RowDeleteRequests, RowInsertRequests,
};
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::info;
use query::parser::PromQuery;
use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
use servers::query_handler::grpc::GrpcQueryHandler;
@@ -47,6 +50,27 @@ impl GrpcQueryHandler for Instance {
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
.context(PermissionSnafu)?;
/*
// copy row inserts to flow worker
if let Request::RowInserts(_) = &request {
let full_req = GreptimeRequest {
header: None,
request: Some(request.clone()),
};
if let Some(flow_proxy) = &self.flow {
flow_proxy
.flow_client
.lock()
.await
.handle(full_req)
.await
.unwrap();
} else {
info!("flow proxy is not initialized");
}
};
let output = Output::AffectedRows(0);*/
let output = match request {
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
@@ -187,10 +211,24 @@ impl Instance {
requests: RowInsertRequests,
ctx: QueryContextRef,
) -> Result<Output> {
let full_req = GreptimeRequest {
header: None,
request: Some(Request::RowInserts(requests.clone())),
};
if let Some(flow_proxy) = &self.flow {
let mut client = flow_proxy.flow_client.lock().await.clone();
client.handle(full_req).await.unwrap();
} else {
info!("flow proxy is not initialized");
}
Ok(Output::AffectedRows(0))
/*
self.inserter
.handle_row_inserts(requests, ctx, self.statement_executor.as_ref())
.await
.context(TableOperationSnafu)
*/
}
pub async fn handle_deletes(