mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-06-01 21:00:38 +00:00
feat: set read-preference for grpc client (#6069)
* feat: set read-preference for grpc client * todo * address PR comments * fix ci
This commit is contained in:
@@ -42,11 +42,11 @@ use futures::future;
|
||||
use futures_util::{Stream, StreamExt, TryStreamExt};
|
||||
use prost::Message;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use tonic::metadata::{AsciiMetadataKey, MetadataValue};
|
||||
use tonic::metadata::{AsciiMetadataKey, AsciiMetadataValue, MetadataMap, MetadataValue};
|
||||
use tonic::transport::Channel;
|
||||
|
||||
use crate::error::{
|
||||
ConvertFlightDataSnafu, Error, FlightGetSnafu, IllegalFlightMessagesSnafu, InvalidAsciiSnafu,
|
||||
ConvertFlightDataSnafu, Error, FlightGetSnafu, IllegalFlightMessagesSnafu,
|
||||
InvalidTonicMetadataValueSnafu, ServerSnafu,
|
||||
};
|
||||
use crate::{from_grpc_response, Client, Result};
|
||||
@@ -165,26 +165,27 @@ impl Database {
|
||||
|
||||
let mut request = tonic::Request::new(request);
|
||||
let metadata = request.metadata_mut();
|
||||
for (key, value) in hints {
|
||||
let key = AsciiMetadataKey::from_bytes(format!("x-greptime-hint-{}", key).as_bytes())
|
||||
.map_err(|_| {
|
||||
InvalidAsciiSnafu {
|
||||
value: key.to_string(),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
let value = value.parse().map_err(|_| {
|
||||
InvalidAsciiSnafu {
|
||||
value: value.to_string(),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
metadata.insert(key, value);
|
||||
}
|
||||
Self::put_hints(metadata, hints)?;
|
||||
|
||||
let response = client.handle(request).await?.into_inner();
|
||||
from_grpc_response(response)
|
||||
}
|
||||
|
||||
fn put_hints(metadata: &mut MetadataMap, hints: &[(&str, &str)]) -> Result<()> {
|
||||
let Some(value) = hints
|
||||
.iter()
|
||||
.map(|(k, v)| format!("{}={}", k, v))
|
||||
.reduce(|a, b| format!("{},{}", a, b))
|
||||
else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let key = AsciiMetadataKey::from_static("x-greptime-hints");
|
||||
let value = AsciiMetadataValue::from_str(&value).context(InvalidTonicMetadataValueSnafu)?;
|
||||
metadata.insert(key, value);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn handle(&self, request: Request) -> Result<u32> {
|
||||
let mut client = make_database_client(&self.client)?.inner;
|
||||
let request = self.to_rpc_request(request);
|
||||
@@ -242,39 +243,49 @@ impl Database {
|
||||
where
|
||||
S: AsRef<str>,
|
||||
{
|
||||
self.do_get(Request::Query(QueryRequest {
|
||||
self.sql_with_hint(sql, &[]).await
|
||||
}
|
||||
|
||||
pub async fn sql_with_hint<S>(&self, sql: S, hints: &[(&str, &str)]) -> Result<Output>
|
||||
where
|
||||
S: AsRef<str>,
|
||||
{
|
||||
let request = Request::Query(QueryRequest {
|
||||
query: Some(Query::Sql(sql.as_ref().to_string())),
|
||||
}))
|
||||
.await
|
||||
});
|
||||
self.do_get(request, hints).await
|
||||
}
|
||||
|
||||
pub async fn logical_plan(&self, logical_plan: Vec<u8>) -> Result<Output> {
|
||||
self.do_get(Request::Query(QueryRequest {
|
||||
let request = Request::Query(QueryRequest {
|
||||
query: Some(Query::LogicalPlan(logical_plan)),
|
||||
}))
|
||||
.await
|
||||
});
|
||||
self.do_get(request, &[]).await
|
||||
}
|
||||
|
||||
pub async fn create(&self, expr: CreateTableExpr) -> Result<Output> {
|
||||
self.do_get(Request::Ddl(DdlRequest {
|
||||
let request = Request::Ddl(DdlRequest {
|
||||
expr: Some(DdlExpr::CreateTable(expr)),
|
||||
}))
|
||||
.await
|
||||
});
|
||||
self.do_get(request, &[]).await
|
||||
}
|
||||
|
||||
pub async fn alter(&self, expr: AlterTableExpr) -> Result<Output> {
|
||||
self.do_get(Request::Ddl(DdlRequest {
|
||||
let request = Request::Ddl(DdlRequest {
|
||||
expr: Some(DdlExpr::AlterTable(expr)),
|
||||
}))
|
||||
.await
|
||||
});
|
||||
self.do_get(request, &[]).await
|
||||
}
|
||||
|
||||
async fn do_get(&self, request: Request) -> Result<Output> {
|
||||
async fn do_get(&self, request: Request, hints: &[(&str, &str)]) -> Result<Output> {
|
||||
let request = self.to_rpc_request(request);
|
||||
let request = Ticket {
|
||||
ticket: request.encode_to_vec().into(),
|
||||
};
|
||||
|
||||
let mut request = tonic::Request::new(request);
|
||||
Self::put_hints(request.metadata_mut(), hints)?;
|
||||
|
||||
let mut client = self.client.make_flight_client()?;
|
||||
|
||||
let response = client.mut_inner().do_get(request).await.or_else(|e| {
|
||||
|
||||
@@ -110,13 +110,6 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to parse ascii string: {}", value))]
|
||||
InvalidAscii {
|
||||
value: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid Tonic metadata value"))]
|
||||
InvalidTonicMetadataValue {
|
||||
#[snafu(source)]
|
||||
@@ -143,10 +136,7 @@ impl ErrorExt for Error {
|
||||
| Error::ConvertFlightData { source, .. }
|
||||
| Error::CreateTlsChannel { source, .. } => source.status_code(),
|
||||
Error::IllegalGrpcClientState { .. } => StatusCode::Unexpected,
|
||||
|
||||
Error::InvalidAscii { .. } | Error::InvalidTonicMetadataValue { .. } => {
|
||||
StatusCode::InvalidArguments
|
||||
}
|
||||
Error::InvalidTonicMetadataValue { .. } => StatusCode::InvalidArguments,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -478,10 +478,11 @@ impl TableRouteStorage {
|
||||
))
|
||||
}
|
||||
|
||||
// TODO(LFC): restore its original visibility after some test utility codes are refined
|
||||
/// Builds a update table route transaction,
|
||||
/// it expected the remote value equals the `current_table_route_value`.
|
||||
/// It retrieves the latest value if the comparing failed.
|
||||
pub(crate) fn build_update_txn(
|
||||
pub fn build_update_txn(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
|
||||
|
||||
@@ -176,6 +176,10 @@ impl DatanodeBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn options(&self) -> &DatanodeOptions {
|
||||
&self.opts
|
||||
}
|
||||
|
||||
pub fn with_meta_client(&mut self, client: MetaClientRef) -> &mut Self {
|
||||
self.meta_client = Some(client);
|
||||
self
|
||||
|
||||
@@ -620,6 +620,9 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Unknown hint: {}", hint))]
|
||||
UnknownHint { hint: String },
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
@@ -690,7 +693,8 @@ impl ErrorExt for Error {
|
||||
| FailedToParseQuery { .. }
|
||||
| InvalidElasticsearchInput { .. }
|
||||
| InvalidJaegerQuery { .. }
|
||||
| ParseTimestamp { .. } => StatusCode::InvalidArguments,
|
||||
| ParseTimestamp { .. }
|
||||
| UnknownHint { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
Catalog { source, .. } => source.status_code(),
|
||||
RowWriter { source, .. } => source.status_code(),
|
||||
|
||||
@@ -42,13 +42,13 @@ use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tonic::{Request, Response, Status, Streaming};
|
||||
|
||||
use crate::error;
|
||||
use crate::error::{InvalidParameterSnafu, ParseJsonSnafu, Result, ToJsonSnafu};
|
||||
pub use crate::grpc::flight::stream::FlightRecordBatchStream;
|
||||
use crate::grpc::greptime_handler::{get_request_type, GreptimeRequestHandler};
|
||||
use crate::grpc::TonicResult;
|
||||
use crate::http::header::constants::GREPTIME_DB_HEADER_NAME;
|
||||
use crate::http::AUTHORIZATION_HEADER;
|
||||
use crate::{error, hint_headers};
|
||||
|
||||
pub type TonicStream<T> = Pin<Box<dyn Stream<Item = TonicResult<T>> + Send + 'static>>;
|
||||
|
||||
@@ -183,6 +183,8 @@ impl FlightCraft for GreptimeRequestHandler {
|
||||
&self,
|
||||
request: Request<Ticket>,
|
||||
) -> TonicResult<Response<TonicStream<FlightData>>> {
|
||||
let hints = hint_headers::extract_hints(request.metadata());
|
||||
|
||||
let ticket = request.into_inner().ticket;
|
||||
let request =
|
||||
GreptimeRequest::decode(ticket.as_ref()).context(error::InvalidFlightTicketSnafu)?;
|
||||
@@ -194,7 +196,7 @@ impl FlightCraft for GreptimeRequestHandler {
|
||||
request_type = get_request_type(&request)
|
||||
);
|
||||
async {
|
||||
let output = self.handle_request(request, Default::default()).await?;
|
||||
let output = self.handle_request(request, hints).await?;
|
||||
let stream = to_flight_data_stream(output, TracingContext::from_current_span());
|
||||
Ok(Response::new(stream))
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
//! Handler for Greptime Database service. It's implemented by frontend.
|
||||
|
||||
use std::str::FromStr;
|
||||
use std::time::Instant;
|
||||
|
||||
use api::helper::request_type;
|
||||
@@ -31,6 +32,7 @@ use common_grpc::flight::FlightDecoder;
|
||||
use common_query::Output;
|
||||
use common_runtime::runtime::RuntimeTrait;
|
||||
use common_runtime::Runtime;
|
||||
use common_session::ReadPreference;
|
||||
use common_telemetry::tracing_context::{FutureExt, TracingContext};
|
||||
use common_telemetry::{debug, error, tracing, warn};
|
||||
use common_time::timezone::parse_timezone;
|
||||
@@ -43,10 +45,11 @@ use tokio::sync::mpsc;
|
||||
use crate::error::Error::UnsupportedAuthScheme;
|
||||
use crate::error::{
|
||||
AuthSnafu, InvalidAuthHeaderInvalidUtf8ValueSnafu, InvalidBase64ValueSnafu, InvalidQuerySnafu,
|
||||
JoinTaskSnafu, NotFoundAuthHeaderSnafu, Result,
|
||||
JoinTaskSnafu, NotFoundAuthHeaderSnafu, Result, UnknownHintSnafu,
|
||||
};
|
||||
use crate::grpc::flight::{PutRecordBatchRequest, PutRecordBatchRequestStream};
|
||||
use crate::grpc::TonicResult;
|
||||
use crate::hint_headers::READ_PREFERENCE_HINT;
|
||||
use crate::metrics;
|
||||
use crate::metrics::{METRIC_AUTH_FAILURE, METRIC_SERVER_GRPC_DB_REQUEST_TIMER};
|
||||
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
|
||||
@@ -82,7 +85,7 @@ impl GreptimeRequestHandler {
|
||||
})?;
|
||||
|
||||
let header = request.header.as_ref();
|
||||
let query_ctx = create_query_context(header, hints);
|
||||
let query_ctx = create_query_context(header, hints)?;
|
||||
let user_info = auth(self.user_provider.clone(), header, &query_ctx).await?;
|
||||
query_ctx.set_current_user(user_info);
|
||||
|
||||
@@ -279,8 +282,8 @@ pub(crate) async fn auth(
|
||||
|
||||
pub(crate) fn create_query_context(
|
||||
header: Option<&RequestHeader>,
|
||||
extensions: Vec<(String, String)>,
|
||||
) -> QueryContextRef {
|
||||
mut extensions: Vec<(String, String)>,
|
||||
) -> Result<QueryContextRef> {
|
||||
let (catalog, schema) = header
|
||||
.map(|header| {
|
||||
// We provide dbname field in newer versions of protos/sdks
|
||||
@@ -313,10 +316,25 @@ pub(crate) fn create_query_context(
|
||||
.current_catalog(catalog)
|
||||
.current_schema(schema)
|
||||
.timezone(timezone);
|
||||
|
||||
if let Some(x) = extensions
|
||||
.iter()
|
||||
.position(|(k, _)| k == READ_PREFERENCE_HINT)
|
||||
{
|
||||
let (k, v) = extensions.swap_remove(x);
|
||||
let Ok(read_preference) = ReadPreference::from_str(&v) else {
|
||||
return UnknownHintSnafu {
|
||||
hint: format!("{k}={v}"),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
ctx_builder = ctx_builder.read_preference(read_preference);
|
||||
}
|
||||
|
||||
for (key, value) in extensions {
|
||||
ctx_builder = ctx_builder.set_extension(key, value);
|
||||
}
|
||||
ctx_builder.build().into()
|
||||
Ok(ctx_builder.build().into())
|
||||
}
|
||||
|
||||
/// Histogram timer for handling gRPC request.
|
||||
@@ -357,3 +375,42 @@ impl Drop for RequestTimer {
|
||||
.observe(self.start.elapsed().as_secs_f64());
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use chrono::FixedOffset;
|
||||
use common_time::Timezone;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_create_query_context() {
|
||||
let header = RequestHeader {
|
||||
catalog: "cat-a-log".to_string(),
|
||||
timezone: "+01:00".to_string(),
|
||||
..Default::default()
|
||||
};
|
||||
let query_context = create_query_context(
|
||||
Some(&header),
|
||||
vec![
|
||||
("auto_create_table".to_string(), "true".to_string()),
|
||||
("read_preference".to_string(), "leader".to_string()),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(query_context.current_catalog(), "cat-a-log");
|
||||
assert_eq!(query_context.current_schema(), DEFAULT_SCHEMA_NAME);
|
||||
assert_eq!(
|
||||
query_context.timezone(),
|
||||
Timezone::Offset(FixedOffset::east_opt(3600).unwrap())
|
||||
);
|
||||
assert!(matches!(
|
||||
query_context.read_preference(),
|
||||
ReadPreference::Leader
|
||||
));
|
||||
assert_eq!(
|
||||
query_context.extensions().into_iter().collect::<Vec<_>>(),
|
||||
vec![("auto_create_table".to_string(), "true".to_string())]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -77,7 +77,7 @@ impl PrometheusGateway for PrometheusGatewayService {
|
||||
};
|
||||
|
||||
let header = inner.header.as_ref();
|
||||
let query_ctx = create_query_context(header, Default::default());
|
||||
let query_ctx = create_query_context(header, Default::default())?;
|
||||
let user_info = auth(self.user_provider.clone(), header, &query_ctx).await?;
|
||||
query_ctx.set_current_user(user_info);
|
||||
|
||||
|
||||
@@ -18,13 +18,16 @@ use tonic::metadata::MetadataMap;
|
||||
// For the given format: `x-greptime-hints: auto_create_table=true, ttl=7d`
|
||||
pub const HINTS_KEY: &str = "x-greptime-hints";
|
||||
|
||||
pub const HINT_KEYS: [&str; 6] = [
|
||||
pub const READ_PREFERENCE_HINT: &str = "read_preference";
|
||||
|
||||
const HINT_KEYS: [&str; 7] = [
|
||||
"x-greptime-hint-auto_create_table",
|
||||
"x-greptime-hint-ttl",
|
||||
"x-greptime-hint-append_mode",
|
||||
"x-greptime-hint-merge_mode",
|
||||
"x-greptime-hint-physical_table",
|
||||
"x-greptime-hint-skip_wal",
|
||||
"x-greptime-hint-read_preference",
|
||||
];
|
||||
|
||||
pub(crate) fn extract_hints<T: ToHeaderMap>(headers: &T) -> Vec<(String, String)> {
|
||||
@@ -90,10 +93,14 @@ mod tests {
|
||||
"x-greptime-hint-physical_table",
|
||||
HeaderValue::from_static("table1"),
|
||||
);
|
||||
headers.insert(
|
||||
"x-greptime-hint-read_preference",
|
||||
HeaderValue::from_static("leader"),
|
||||
);
|
||||
|
||||
let hints = extract_hints(&headers);
|
||||
|
||||
assert_eq!(hints.len(), 5);
|
||||
assert_eq!(hints.len(), 6);
|
||||
assert_eq!(
|
||||
hints[0],
|
||||
("auto_create_table".to_string(), "true".to_string())
|
||||
@@ -105,6 +112,10 @@ mod tests {
|
||||
hints[4],
|
||||
("physical_table".to_string(), "table1".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
hints[5],
|
||||
("read_preference".to_string(), "leader".to_string())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -131,12 +142,13 @@ mod tests {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(
|
||||
"x-greptime-hints",
|
||||
HeaderValue::from_static(" auto_create_table=true, ttl =3600d, append_mode=true , merge_mode=false , physical_table= table1"),
|
||||
HeaderValue::from_static(" auto_create_table=true, ttl =3600d, append_mode=true , merge_mode=false , physical_table= table1,\
|
||||
read_preference=leader"),
|
||||
);
|
||||
|
||||
let hints = extract_hints(&headers);
|
||||
|
||||
assert_eq!(hints.len(), 5);
|
||||
assert_eq!(hints.len(), 6);
|
||||
assert_eq!(
|
||||
hints[0],
|
||||
("auto_create_table".to_string(), "true".to_string())
|
||||
@@ -148,6 +160,10 @@ mod tests {
|
||||
hints[4],
|
||||
("physical_table".to_string(), "table1".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
hints[5],
|
||||
("read_preference".to_string(), "leader".to_string())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -170,10 +186,14 @@ mod tests {
|
||||
"x-greptime-hint-physical_table",
|
||||
MetadataValue::from_static("table1"),
|
||||
);
|
||||
metadata.insert(
|
||||
"x-greptime-hint-read_preference",
|
||||
MetadataValue::from_static("leader"),
|
||||
);
|
||||
|
||||
let hints = extract_hints(&metadata);
|
||||
|
||||
assert_eq!(hints.len(), 5);
|
||||
assert_eq!(hints.len(), 6);
|
||||
assert_eq!(
|
||||
hints[0],
|
||||
("auto_create_table".to_string(), "true".to_string())
|
||||
@@ -185,6 +205,10 @@ mod tests {
|
||||
hints[4],
|
||||
("physical_table".to_string(), "table1".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
hints[5],
|
||||
("read_preference".to_string(), "leader".to_string())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -21,6 +22,7 @@ use common_runtime::Runtime;
|
||||
use common_telemetry::{error, info};
|
||||
use futures::future::{try_join_all, AbortHandle, AbortRegistration, Abortable};
|
||||
use snafu::{ensure, ResultExt};
|
||||
use strum::Display;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_stream::wrappers::TcpListenerStream;
|
||||
@@ -32,12 +34,18 @@ pub(crate) type AbortableStream = Abortable<TcpListenerStream>;
|
||||
pub type ServerHandler = (Box<dyn Server>, SocketAddr);
|
||||
|
||||
/// [ServerHandlers] is used to manage the lifecycle of all the services like http or grpc in the GreptimeDB server.
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, Display)]
|
||||
pub enum ServerHandlers {
|
||||
Init(Arc<std::sync::Mutex<HashMap<String, ServerHandler>>>),
|
||||
Started(Arc<HashMap<String, Box<dyn Server>>>),
|
||||
}
|
||||
|
||||
impl Debug for ServerHandlers {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "ServerHandlers::{}", self)
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ServerHandlers {
|
||||
fn default() -> Self {
|
||||
Self::Init(Arc::new(std::sync::Mutex::new(HashMap::new())))
|
||||
@@ -91,13 +99,7 @@ impl ServerHandlers {
|
||||
|
||||
try_join_all(handlers.values_mut().map(|(server, addr)| async move {
|
||||
server.start(*addr).await?;
|
||||
|
||||
let bind_addr = server.bind_addr();
|
||||
info!(
|
||||
"Server {} is started and bind to {:?}",
|
||||
server.name(),
|
||||
bind_addr,
|
||||
);
|
||||
info!("Server {} is started", server.name());
|
||||
Ok::<(), error::Error>(())
|
||||
}))
|
||||
.await?;
|
||||
|
||||
@@ -125,6 +125,15 @@ impl QueryContextBuilder {
|
||||
.explain_options = explain_options;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn read_preference(mut self, read_preference: ReadPreference) -> Self {
|
||||
self.mutable_session_data
|
||||
.get_or_insert_default()
|
||||
.write()
|
||||
.unwrap()
|
||||
.read_preference = read_preference;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&RegionRequestHeader> for QueryContext {
|
||||
|
||||
Reference in New Issue
Block a user