mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-24 08:50:40 +00:00
@@ -35,9 +35,9 @@ use common_time::timezone::parse_timezone;
|
||||
use futures_util::StreamExt;
|
||||
use session::context::{
|
||||
Channel, QueryContextBuilder, QueryContextRef, REMOTE_QUERY_ID_EXTENSION_KEY,
|
||||
generate_remote_query_id, is_reserved_extension_key,
|
||||
generate_remote_query_id,
|
||||
};
|
||||
use session::hints::READ_PREFERENCE_HINT;
|
||||
use session::hints::{READ_PREFERENCE_HINT, is_reserved_extension_key};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
@@ -239,6 +239,10 @@ pub(crate) fn create_query_context(
|
||||
|
||||
for (key, value) in extensions {
|
||||
if is_reserved_extension_key(&key) {
|
||||
debug!(
|
||||
key = key.as_str(),
|
||||
"Ignoring reserved external query context extension key"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
ctx_builder = ctx_builder.set_extension(key, value);
|
||||
|
||||
@@ -16,58 +16,65 @@ use axum::body::Body;
|
||||
use axum::http::Request;
|
||||
use axum::middleware::Next;
|
||||
use axum::response::Response;
|
||||
use session::context::{QueryContext, REMOTE_QUERY_ID_EXTENSION_KEY, is_reserved_extension_key};
|
||||
use common_telemetry::debug;
|
||||
use session::context::QueryContext;
|
||||
use session::hints::is_reserved_extension_key;
|
||||
|
||||
use crate::hint_headers;
|
||||
|
||||
pub async fn extract_hints(mut request: Request<Body>, next: Next) -> Response {
|
||||
let hints = hint_headers::extract_hints(request.headers());
|
||||
if let Some(query_ctx) = request.extensions_mut().get_mut::<QueryContext>() {
|
||||
for (key, value) in hints {
|
||||
apply_hint(query_ctx, key, value);
|
||||
}
|
||||
apply_hints(query_ctx, hints);
|
||||
}
|
||||
next.run(request).await
|
||||
}
|
||||
|
||||
fn apply_hint(query_ctx: &mut QueryContext, key: String, value: String) {
|
||||
if is_reserved_extension_key(&key) {
|
||||
return;
|
||||
fn apply_hints(query_ctx: &mut QueryContext, hints: Vec<(String, String)>) {
|
||||
for (key, value) in hints {
|
||||
if is_reserved_extension_key(&key) {
|
||||
debug!(
|
||||
key = key.as_str(),
|
||||
"Ignoring reserved external query context extension key"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
query_ctx.set_extension(key, value);
|
||||
}
|
||||
query_ctx.set_extension(key, value);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use session::context::{QueryContextBuilder, generate_remote_query_id};
|
||||
use session::hints::REMOTE_QUERY_ID_EXTENSION_KEY;
|
||||
|
||||
use super::*;
|
||||
use super::apply_hints;
|
||||
|
||||
#[test]
|
||||
fn test_apply_hint_ignores_remote_query_id() {
|
||||
let expected_remote_query_id = generate_remote_query_id();
|
||||
fn test_apply_hints_ignores_reserved_extension_keys() {
|
||||
let original_query_id = generate_remote_query_id();
|
||||
let mut query_ctx = QueryContextBuilder::default()
|
||||
.set_extension(
|
||||
REMOTE_QUERY_ID_EXTENSION_KEY.to_string(),
|
||||
expected_remote_query_id.clone(),
|
||||
original_query_id.clone(),
|
||||
)
|
||||
.build();
|
||||
|
||||
apply_hint(
|
||||
apply_hints(
|
||||
&mut query_ctx,
|
||||
REMOTE_QUERY_ID_EXTENSION_KEY.to_string(),
|
||||
"spoofed-query-id".to_string(),
|
||||
);
|
||||
apply_hint(
|
||||
&mut query_ctx,
|
||||
"auto_create_table".to_string(),
|
||||
"true".to_string(),
|
||||
vec![
|
||||
(
|
||||
REMOTE_QUERY_ID_EXTENSION_KEY.to_string(),
|
||||
"spoofed".to_string(),
|
||||
),
|
||||
("ttl".to_string(), "7d".to_string()),
|
||||
],
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
query_ctx.remote_query_id(),
|
||||
Some(expected_remote_query_id.as_str())
|
||||
Some(original_query_id.as_str())
|
||||
);
|
||||
assert_eq!(query_ctx.extension("auto_create_table"), Some("true"));
|
||||
assert_eq!(query_ctx.extension("ttl"), Some("7d"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,6 +32,7 @@ use datafusion_common::config::ConfigOptions;
|
||||
use derive_builder::Builder;
|
||||
use sql::dialect::{Dialect, GenericDialect, GreptimeDbDialect, MySqlDialect, PostgreSqlDialect};
|
||||
|
||||
pub use crate::hints::REMOTE_QUERY_ID_EXTENSION_KEY;
|
||||
use crate::protocol_ctx::ProtocolCtx;
|
||||
use crate::query_id::QueryId;
|
||||
use crate::session_config::{PGByteaOutputValue, PGDateOrder, PGDateTimeStyle, PGIntervalStyle};
|
||||
@@ -41,11 +42,6 @@ pub type QueryContextRef = Arc<QueryContext>;
|
||||
pub type ConnInfoRef = Arc<ConnInfo>;
|
||||
|
||||
const CURSOR_COUNT_WARNING_LIMIT: usize = 10;
|
||||
pub const REMOTE_QUERY_ID_EXTENSION_KEY: &str = "remote_query_id";
|
||||
|
||||
pub fn is_reserved_extension_key(key: &str) -> bool {
|
||||
key == REMOTE_QUERY_ID_EXTENSION_KEY
|
||||
}
|
||||
|
||||
pub fn generate_remote_query_id() -> String {
|
||||
generate_remote_query_id_value().to_string()
|
||||
@@ -793,7 +789,15 @@ mod test {
|
||||
assert_eq!(roundtrip_api.current_catalog, api_ctx.current_catalog);
|
||||
assert_eq!(roundtrip_api.current_schema, api_ctx.current_schema);
|
||||
assert_eq!(roundtrip_api.timezone, api_ctx.timezone);
|
||||
assert_eq!(roundtrip_api.extensions, api_ctx.extensions);
|
||||
assert_eq!(
|
||||
roundtrip_api.extensions.get("flow.return_region_seq"),
|
||||
Some(&"true".to_string())
|
||||
);
|
||||
assert!(
|
||||
roundtrip_api
|
||||
.extensions
|
||||
.contains_key(REMOTE_QUERY_ID_EXTENSION_KEY)
|
||||
);
|
||||
assert_eq!(roundtrip_api.channel, api_ctx.channel);
|
||||
assert_eq!(roundtrip_api.snapshot_seqs, api_ctx.snapshot_seqs);
|
||||
}
|
||||
|
||||
@@ -16,8 +16,10 @@
|
||||
pub const HINTS_KEY: &str = "x-greptime-hints";
|
||||
/// Deprecated, use `HINTS_KEY` instead. Notes if "x-greptime-hints" is set, keys with this prefix will be ignored.
|
||||
pub const HINTS_KEY_PREFIX: &str = "x-greptime-hint-";
|
||||
pub const REMOTE_QUERY_ID_EXTENSION_KEY: &str = "remote_query_id";
|
||||
|
||||
pub const READ_PREFERENCE_HINT: &str = "read_preference";
|
||||
pub const RESERVED_EXTENSION_KEYS: [&str; 1] = [REMOTE_QUERY_ID_EXTENSION_KEY];
|
||||
|
||||
/// Deprecated, use `HINTS_KEY` instead.
|
||||
pub const HINT_KEYS: [&str; 7] = [
|
||||
@@ -29,3 +31,18 @@ pub const HINT_KEYS: [&str; 7] = [
|
||||
"x-greptime-hint-skip_wal",
|
||||
"x-greptime-hint-read_preference",
|
||||
];
|
||||
|
||||
pub fn is_reserved_extension_key(key: &str) -> bool {
|
||||
RESERVED_EXTENSION_KEYS.contains(&key)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_is_reserved_extension_key() {
|
||||
assert!(is_reserved_extension_key(REMOTE_QUERY_ID_EXTENSION_KEY));
|
||||
assert!(!is_reserved_extension_key(READ_PREFERENCE_HINT));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user