diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 863b6b6419..d8e538242e 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -42,11 +42,11 @@ use futures::future; use futures_util::{Stream, StreamExt, TryStreamExt}; use prost::Message; use snafu::{ensure, ResultExt}; -use tonic::metadata::{AsciiMetadataKey, MetadataValue}; +use tonic::metadata::{AsciiMetadataKey, AsciiMetadataValue, MetadataMap, MetadataValue}; use tonic::transport::Channel; use crate::error::{ - ConvertFlightDataSnafu, Error, FlightGetSnafu, IllegalFlightMessagesSnafu, InvalidAsciiSnafu, + ConvertFlightDataSnafu, Error, FlightGetSnafu, IllegalFlightMessagesSnafu, InvalidTonicMetadataValueSnafu, ServerSnafu, }; use crate::{from_grpc_response, Client, Result}; @@ -165,26 +165,27 @@ impl Database { let mut request = tonic::Request::new(request); let metadata = request.metadata_mut(); - for (key, value) in hints { - let key = AsciiMetadataKey::from_bytes(format!("x-greptime-hint-{}", key).as_bytes()) - .map_err(|_| { - InvalidAsciiSnafu { - value: key.to_string(), - } - .build() - })?; - let value = value.parse().map_err(|_| { - InvalidAsciiSnafu { - value: value.to_string(), - } - .build() - })?; - metadata.insert(key, value); - } + Self::put_hints(metadata, hints)?; + let response = client.handle(request).await?.into_inner(); from_grpc_response(response) } + fn put_hints(metadata: &mut MetadataMap, hints: &[(&str, &str)]) -> Result<()> { + let Some(value) = hints + .iter() + .map(|(k, v)| format!("{}={}", k, v)) + .reduce(|a, b| format!("{},{}", a, b)) + else { + return Ok(()); + }; + + let key = AsciiMetadataKey::from_static("x-greptime-hints"); + let value = AsciiMetadataValue::from_str(&value).context(InvalidTonicMetadataValueSnafu)?; + metadata.insert(key, value); + Ok(()) + } + pub async fn handle(&self, request: Request) -> Result { let mut client = make_database_client(&self.client)?.inner; let request = self.to_rpc_request(request); @@ -242,39 +243,49 @@ impl Database { where S: AsRef, { - self.do_get(Request::Query(QueryRequest { + self.sql_with_hint(sql, &[]).await + } + + pub async fn sql_with_hint(&self, sql: S, hints: &[(&str, &str)]) -> Result + where + S: AsRef, + { + let request = Request::Query(QueryRequest { query: Some(Query::Sql(sql.as_ref().to_string())), - })) - .await + }); + self.do_get(request, hints).await } pub async fn logical_plan(&self, logical_plan: Vec) -> Result { - self.do_get(Request::Query(QueryRequest { + let request = Request::Query(QueryRequest { query: Some(Query::LogicalPlan(logical_plan)), - })) - .await + }); + self.do_get(request, &[]).await } pub async fn create(&self, expr: CreateTableExpr) -> Result { - self.do_get(Request::Ddl(DdlRequest { + let request = Request::Ddl(DdlRequest { expr: Some(DdlExpr::CreateTable(expr)), - })) - .await + }); + self.do_get(request, &[]).await } pub async fn alter(&self, expr: AlterTableExpr) -> Result { - self.do_get(Request::Ddl(DdlRequest { + let request = Request::Ddl(DdlRequest { expr: Some(DdlExpr::AlterTable(expr)), - })) - .await + }); + self.do_get(request, &[]).await } - async fn do_get(&self, request: Request) -> Result { + async fn do_get(&self, request: Request, hints: &[(&str, &str)]) -> Result { let request = self.to_rpc_request(request); let request = Ticket { ticket: request.encode_to_vec().into(), }; + let mut request = tonic::Request::new(request); + Self::put_hints(request.metadata_mut(), hints)?; + let mut client = self.client.make_flight_client()?; let response = client.mut_inner().do_get(request).await.or_else(|e| { diff --git a/src/client/src/error.rs b/src/client/src/error.rs index 3f680b1427..858318c736 100644 --- a/src/client/src/error.rs +++ b/src/client/src/error.rs @@ -110,13 +110,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to parse ascii string: {}", value))] - InvalidAscii { - value: String, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Invalid Tonic metadata value"))] InvalidTonicMetadataValue { #[snafu(source)] @@ -143,10 +136,7 @@ impl ErrorExt for Error { | Error::ConvertFlightData { source, .. } | Error::CreateTlsChannel { source, .. } => source.status_code(), Error::IllegalGrpcClientState { .. } => StatusCode::Unexpected, - - Error::InvalidAscii { .. } | Error::InvalidTonicMetadataValue { .. } => { - StatusCode::InvalidArguments - } + Error::InvalidTonicMetadataValue { .. } => StatusCode::InvalidArguments, } } diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index b5ebf0b4b1..94d2a0bf07 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -478,10 +478,11 @@ impl TableRouteStorage { )) } + // TODO(LFC): restore its original visibility after some test utility codes are refined /// Builds a update table route transaction, /// it expected the remote value equals the `current_table_route_value`. /// It retrieves the latest value if the comparing failed. - pub(crate) fn build_update_txn( + pub fn build_update_txn( &self, table_id: TableId, current_table_route_value: &DeserializedValueWithBytes, diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 032f3ffb2d..fddca06ed7 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -176,6 +176,10 @@ impl DatanodeBuilder { } } + pub fn options(&self) -> &DatanodeOptions { + &self.opts + } + pub fn with_meta_client(&mut self, client: MetaClientRef) -> &mut Self { self.meta_client = Some(client); self diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index e9ecca366a..f6d03a2a8f 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -620,6 +620,9 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Unknown hint: {}", hint))] + UnknownHint { hint: String }, } pub type Result = std::result::Result; @@ -690,7 +693,8 @@ impl ErrorExt for Error { | FailedToParseQuery { .. } | InvalidElasticsearchInput { .. } | InvalidJaegerQuery { .. } - | ParseTimestamp { .. } => StatusCode::InvalidArguments, + | ParseTimestamp { .. } + | UnknownHint { .. } => StatusCode::InvalidArguments, Catalog { source, .. } => source.status_code(), RowWriter { source, .. } => source.status_code(), diff --git a/src/servers/src/grpc/flight.rs b/src/servers/src/grpc/flight.rs index 4e1ced8662..6f12c5b654 100644 --- a/src/servers/src/grpc/flight.rs +++ b/src/servers/src/grpc/flight.rs @@ -42,13 +42,13 @@ use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status, Streaming}; -use crate::error; use crate::error::{InvalidParameterSnafu, ParseJsonSnafu, Result, ToJsonSnafu}; pub use crate::grpc::flight::stream::FlightRecordBatchStream; use crate::grpc::greptime_handler::{get_request_type, GreptimeRequestHandler}; use crate::grpc::TonicResult; use crate::http::header::constants::GREPTIME_DB_HEADER_NAME; use crate::http::AUTHORIZATION_HEADER; +use crate::{error, hint_headers}; pub type TonicStream = Pin> + Send + 'static>>; @@ -183,6 +183,8 @@ impl FlightCraft for GreptimeRequestHandler { &self, request: Request, ) -> TonicResult>> { + let hints = hint_headers::extract_hints(request.metadata()); + let ticket = request.into_inner().ticket; let request = GreptimeRequest::decode(ticket.as_ref()).context(error::InvalidFlightTicketSnafu)?; @@ -194,7 +196,7 @@ impl FlightCraft for GreptimeRequestHandler { request_type = get_request_type(&request) ); async { - let output = self.handle_request(request, Default::default()).await?; + let output = self.handle_request(request, hints).await?; let stream = to_flight_data_stream(output, TracingContext::from_current_span()); Ok(Response::new(stream)) } diff --git a/src/servers/src/grpc/greptime_handler.rs b/src/servers/src/grpc/greptime_handler.rs index f29c3f8e56..b4736ab4d4 100644 --- a/src/servers/src/grpc/greptime_handler.rs +++ b/src/servers/src/grpc/greptime_handler.rs @@ -14,6 +14,7 @@ //! Handler for Greptime Database service. It's implemented by frontend. +use std::str::FromStr; use std::time::Instant; use api::helper::request_type; @@ -31,6 +32,7 @@ use common_grpc::flight::FlightDecoder; use common_query::Output; use common_runtime::runtime::RuntimeTrait; use common_runtime::Runtime; +use common_session::ReadPreference; use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::{debug, error, tracing, warn}; use common_time::timezone::parse_timezone; @@ -43,10 +45,11 @@ use tokio::sync::mpsc; use crate::error::Error::UnsupportedAuthScheme; use crate::error::{ AuthSnafu, InvalidAuthHeaderInvalidUtf8ValueSnafu, InvalidBase64ValueSnafu, InvalidQuerySnafu, - JoinTaskSnafu, NotFoundAuthHeaderSnafu, Result, + JoinTaskSnafu, NotFoundAuthHeaderSnafu, Result, UnknownHintSnafu, }; use crate::grpc::flight::{PutRecordBatchRequest, PutRecordBatchRequestStream}; use crate::grpc::TonicResult; +use crate::hint_headers::READ_PREFERENCE_HINT; use crate::metrics; use crate::metrics::{METRIC_AUTH_FAILURE, METRIC_SERVER_GRPC_DB_REQUEST_TIMER}; use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; @@ -82,7 +85,7 @@ impl GreptimeRequestHandler { })?; let header = request.header.as_ref(); - let query_ctx = create_query_context(header, hints); + let query_ctx = create_query_context(header, hints)?; let user_info = auth(self.user_provider.clone(), header, &query_ctx).await?; query_ctx.set_current_user(user_info); @@ -279,8 +282,8 @@ pub(crate) async fn auth( pub(crate) fn create_query_context( header: Option<&RequestHeader>, - extensions: Vec<(String, String)>, -) -> QueryContextRef { + mut extensions: Vec<(String, String)>, +) -> Result { let (catalog, schema) = header .map(|header| { // We provide dbname field in newer versions of protos/sdks @@ -313,10 +316,25 @@ pub(crate) fn create_query_context( .current_catalog(catalog) .current_schema(schema) .timezone(timezone); + + if let Some(x) = extensions + .iter() + .position(|(k, _)| k == READ_PREFERENCE_HINT) + { + let (k, v) = extensions.swap_remove(x); + let Ok(read_preference) = ReadPreference::from_str(&v) else { + return UnknownHintSnafu { + hint: format!("{k}={v}"), + } + .fail(); + }; + ctx_builder = ctx_builder.read_preference(read_preference); + } + for (key, value) in extensions { ctx_builder = ctx_builder.set_extension(key, value); } - ctx_builder.build().into() + Ok(ctx_builder.build().into()) } /// Histogram timer for handling gRPC request. @@ -357,3 +375,42 @@ impl Drop for RequestTimer { .observe(self.start.elapsed().as_secs_f64()); } } + +#[cfg(test)] +mod tests { + use chrono::FixedOffset; + use common_time::Timezone; + + use super::*; + + #[test] + fn test_create_query_context() { + let header = RequestHeader { + catalog: "cat-a-log".to_string(), + timezone: "+01:00".to_string(), + ..Default::default() + }; + let query_context = create_query_context( + Some(&header), + vec![ + ("auto_create_table".to_string(), "true".to_string()), + ("read_preference".to_string(), "leader".to_string()), + ], + ) + .unwrap(); + assert_eq!(query_context.current_catalog(), "cat-a-log"); + assert_eq!(query_context.current_schema(), DEFAULT_SCHEMA_NAME); + assert_eq!( + query_context.timezone(), + Timezone::Offset(FixedOffset::east_opt(3600).unwrap()) + ); + assert!(matches!( + query_context.read_preference(), + ReadPreference::Leader + )); + assert_eq!( + query_context.extensions().into_iter().collect::>(), + vec![("auto_create_table".to_string(), "true".to_string())] + ); + } +} diff --git a/src/servers/src/grpc/prom_query_gateway.rs b/src/servers/src/grpc/prom_query_gateway.rs index f813459774..8658be1b88 100644 --- a/src/servers/src/grpc/prom_query_gateway.rs +++ b/src/servers/src/grpc/prom_query_gateway.rs @@ -77,7 +77,7 @@ impl PrometheusGateway for PrometheusGatewayService { }; let header = inner.header.as_ref(); - let query_ctx = create_query_context(header, Default::default()); + let query_ctx = create_query_context(header, Default::default())?; let user_info = auth(self.user_provider.clone(), header, &query_ctx).await?; query_ctx.set_current_user(user_info); diff --git a/src/servers/src/hint_headers.rs b/src/servers/src/hint_headers.rs index 1ead2edb9a..30a9fe59f8 100644 --- a/src/servers/src/hint_headers.rs +++ b/src/servers/src/hint_headers.rs @@ -18,13 +18,16 @@ use tonic::metadata::MetadataMap; // For the given format: `x-greptime-hints: auto_create_table=true, ttl=7d` pub const HINTS_KEY: &str = "x-greptime-hints"; -pub const HINT_KEYS: [&str; 6] = [ +pub const READ_PREFERENCE_HINT: &str = "read_preference"; + +const HINT_KEYS: [&str; 7] = [ "x-greptime-hint-auto_create_table", "x-greptime-hint-ttl", "x-greptime-hint-append_mode", "x-greptime-hint-merge_mode", "x-greptime-hint-physical_table", "x-greptime-hint-skip_wal", + "x-greptime-hint-read_preference", ]; pub(crate) fn extract_hints(headers: &T) -> Vec<(String, String)> { @@ -90,10 +93,14 @@ mod tests { "x-greptime-hint-physical_table", HeaderValue::from_static("table1"), ); + headers.insert( + "x-greptime-hint-read_preference", + HeaderValue::from_static("leader"), + ); let hints = extract_hints(&headers); - assert_eq!(hints.len(), 5); + assert_eq!(hints.len(), 6); assert_eq!( hints[0], ("auto_create_table".to_string(), "true".to_string()) @@ -105,6 +112,10 @@ mod tests { hints[4], ("physical_table".to_string(), "table1".to_string()) ); + assert_eq!( + hints[5], + ("read_preference".to_string(), "leader".to_string()) + ); } #[test] @@ -131,12 +142,13 @@ mod tests { let mut headers = HeaderMap::new(); headers.insert( "x-greptime-hints", - HeaderValue::from_static(" auto_create_table=true, ttl =3600d, append_mode=true , merge_mode=false , physical_table= table1"), + HeaderValue::from_static(" auto_create_table=true, ttl =3600d, append_mode=true , merge_mode=false , physical_table= table1,\ + read_preference=leader"), ); let hints = extract_hints(&headers); - assert_eq!(hints.len(), 5); + assert_eq!(hints.len(), 6); assert_eq!( hints[0], ("auto_create_table".to_string(), "true".to_string()) @@ -148,6 +160,10 @@ mod tests { hints[4], ("physical_table".to_string(), "table1".to_string()) ); + assert_eq!( + hints[5], + ("read_preference".to_string(), "leader".to_string()) + ); } #[test] @@ -170,10 +186,14 @@ mod tests { "x-greptime-hint-physical_table", MetadataValue::from_static("table1"), ); + metadata.insert( + "x-greptime-hint-read_preference", + MetadataValue::from_static("leader"), + ); let hints = extract_hints(&metadata); - assert_eq!(hints.len(), 5); + assert_eq!(hints.len(), 6); assert_eq!( hints[0], ("auto_create_table".to_string(), "true".to_string()) @@ -185,6 +205,10 @@ mod tests { hints[4], ("physical_table".to_string(), "table1".to_string()) ); + assert_eq!( + hints[5], + ("read_preference".to_string(), "leader".to_string()) + ); } #[test] diff --git a/src/servers/src/server.rs b/src/servers/src/server.rs index 0d655e5348..9e6f674dda 100644 --- a/src/servers/src/server.rs +++ b/src/servers/src/server.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::fmt::{Debug, Formatter}; use std::net::SocketAddr; use std::sync::Arc; @@ -21,6 +22,7 @@ use common_runtime::Runtime; use common_telemetry::{error, info}; use futures::future::{try_join_all, AbortHandle, AbortRegistration, Abortable}; use snafu::{ensure, ResultExt}; +use strum::Display; use tokio::sync::Mutex; use tokio::task::JoinHandle; use tokio_stream::wrappers::TcpListenerStream; @@ -32,12 +34,18 @@ pub(crate) type AbortableStream = Abortable; pub type ServerHandler = (Box, SocketAddr); /// [ServerHandlers] is used to manage the lifecycle of all the services like http or grpc in the GreptimeDB server. -#[derive(Clone)] +#[derive(Clone, Display)] pub enum ServerHandlers { Init(Arc>>), Started(Arc>>), } +impl Debug for ServerHandlers { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "ServerHandlers::{}", self) + } +} + impl Default for ServerHandlers { fn default() -> Self { Self::Init(Arc::new(std::sync::Mutex::new(HashMap::new()))) @@ -91,13 +99,7 @@ impl ServerHandlers { try_join_all(handlers.values_mut().map(|(server, addr)| async move { server.start(*addr).await?; - - let bind_addr = server.bind_addr(); - info!( - "Server {} is started and bind to {:?}", - server.name(), - bind_addr, - ); + info!("Server {} is started", server.name()); Ok::<(), error::Error>(()) })) .await?; diff --git a/src/session/src/context.rs b/src/session/src/context.rs index 3866e77912..2d8fefe073 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -125,6 +125,15 @@ impl QueryContextBuilder { .explain_options = explain_options; self } + + pub fn read_preference(mut self, read_preference: ReadPreference) -> Self { + self.mutable_session_data + .get_or_insert_default() + .write() + .unwrap() + .read_preference = read_preference; + self + } } impl From<&RegionRequestHeader> for QueryContext {