test: cover standalone user provider config (#8067)

* test: cover standalone user provider config

Signed-off-by: Detachm <42765252+Detachm@users.noreply.github.com>

* test: cover config-driven http auth

Signed-off-by: Detachm <42765252+Detachm@users.noreply.github.com>

---------

Signed-off-by: Detachm <42765252+Detachm@users.noreply.github.com>
This commit is contained in:
Han
2026-05-08 16:56:22 +08:00
committed by GitHub
parent e1156728fc
commit b5997c6797
13 changed files with 188 additions and 42 deletions

2
Cargo.lock generated
View File

@@ -14081,6 +14081,7 @@ dependencies = [
"async-trait",
"auth",
"axum 0.8.4",
"base64 0.22.1",
"cache",
"catalog",
"chrono",
@@ -14135,6 +14136,7 @@ dependencies = [
"partition",
"paste",
"pipeline",
"plugins",
"prost 0.14.1",
"query",
"rand 0.9.1",

View File

@@ -14,6 +14,7 @@
| --- | -----| ------- | ----------- |
| `default_timezone` | String | Unset | The default timezone of the server. |
| `default_column_prefix` | String | Unset | The default column prefix for auto-created time index and value columns. |
| `user_provider` | String | Unset | The user provider for authentication.<br/>Examples: "static_user_provider:file:/path/to/users", "static_user_provider:cmd:greptime_user=greptime_pwd" |
| `max_in_flight_write_bytes` | String | Unset | Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).<br/>Set to 0 to disable the limit. Default: "0" (unlimited) |
| `write_bytes_exhausted_policy` | String | Unset | Policy when write bytes quota is exhausted.<br/>Options: "wait" (default, 10s timeout), "wait(<duration>)" (e.g., "wait(30s)"), "fail" |
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
@@ -231,6 +232,7 @@
| --- | -----| ------- | ----------- |
| `default_timezone` | String | Unset | The default timezone of the server. |
| `default_column_prefix` | String | Unset | The default column prefix for auto-created time index and value columns. |
| `user_provider` | String | Unset | The user provider for authentication.<br/>Examples: "static_user_provider:file:/path/to/users", "static_user_provider:cmd:greptime_user=greptime_pwd" |
| `max_in_flight_write_bytes` | String | Unset | Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).<br/>Set to 0 to disable the limit. Default: "0" (unlimited) |
| `write_bytes_exhausted_policy` | String | Unset | Policy when write bytes quota is exhausted.<br/>Options: "wait" (default, 10s timeout), "wait(<duration>)" (e.g., "wait(30s)"), "fail" |
| `runtime` | -- | -- | The runtime options. |
@@ -624,6 +626,7 @@
| Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- |
| `node_id` | Integer | Unset | The flownode identifier and should be unique in the cluster. |
| `user_provider` | String | Unset | The user provider for authentication.<br/>Examples: "static_user_provider:file:/path/to/users", "static_user_provider:cmd:greptime_user=greptime_pwd" |
| `flow` | -- | -- | flow engine options. |
| `flow.num_workers` | Integer | `0` | The number of flow worker in flownode.<br/>Not setting(or set to 0) this value will use the number of CPU cores divided by 2. |
| `flow.batching_mode` | -- | -- | -- |

View File

@@ -2,6 +2,11 @@
## @toml2docs:none-default
node_id = 14
## The user provider for authentication.
## Examples: "static_user_provider:file:/path/to/users", "static_user_provider:cmd:greptime_user=greptime_pwd"
## @toml2docs:none-default
#+ user_provider = "static_user_provider:file:/path/to/users"
## flow engine options.
[flow]
## The number of flow worker in flownode.

View File

@@ -6,6 +6,11 @@ default_timezone = "UTC"
## @toml2docs:none-default
default_column_prefix = "greptime"
## The user provider for authentication.
## Examples: "static_user_provider:file:/path/to/users", "static_user_provider:cmd:greptime_user=greptime_pwd"
## @toml2docs:none-default
#+ user_provider = "static_user_provider:file:/path/to/users"
## Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).
## Set to 0 to disable the limit. Default: "0" (unlimited)
## @toml2docs:none-default

View File

@@ -6,6 +6,11 @@ default_timezone = "UTC"
## @toml2docs:none-default
default_column_prefix = "greptime"
## The user provider for authentication.
## Examples: "static_user_provider:file:/path/to/users", "static_user_provider:cmd:greptime_user=greptime_pwd"
## @toml2docs:none-default
#+ user_provider = "static_user_provider:file:/path/to/users"
## Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).
## Set to 0 to disable the limit. Default: "0" (unlimited)
## @toml2docs:none-default

View File

@@ -312,6 +312,32 @@ fn test_load_standalone_example_config() {
similar_asserts::assert_eq!(options, expected);
}
#[test]
fn test_load_standalone_user_provider_from_config() {
let config = tempfile::NamedTempFile::new().unwrap();
let user_provider = "static_user_provider:file:/tmp/greptimedb-users";
std::fs::write(
config.path(),
format!("user_provider = \"{user_provider}\"\n"),
)
.unwrap();
let options =
GreptimeOptions::<StandaloneOptions>::load_layered_options(config.path().to_str(), "")
.unwrap();
assert_eq!(
options.component.user_provider.as_deref(),
Some(user_provider)
);
let frontend_options = options.component.frontend_options();
assert_eq!(
frontend_options.user_provider.as_deref(),
Some(user_provider)
);
}
#[test]
fn test_load_heartbeat_env_vars_from_env() {
let env_prefix = "HEARTBEAT_ENV_VARS_UT";

View File

@@ -362,14 +362,13 @@ mod tests {
#[test]
fn test_decode_basic() {
// base64encode("username:password") == "dXNlcm5hbWU6cGFzc3dvcmQ="
let credential = "dXNlcm5hbWU6cGFzc3dvcmQ=";
let (username, pwd) = decode_basic(credential).unwrap();
let credential = basic_auth_credentials("username", "password");
let (username, pwd) = decode_basic(&credential).unwrap();
assert_eq!("username", username);
assert_eq!("password", pwd.expose_secret());
let wrong_credential = "dXNlcm5hbWU6cG Fzc3dvcmQ=";
let result = decode_basic(wrong_credential);
let wrong_credential = credential.replacen('c', "c ", 1);
let result = decode_basic(&wrong_credential);
assert_matches!(result.err(), Some(error::Error::InvalidBase64Value { .. }));
}
@@ -379,8 +378,8 @@ mod tests {
let re: Result<AuthScheme> = auth_scheme_str.try_into();
assert!(re.is_err());
let auth_scheme_str = "basic dGVzdDp0ZXN0";
let scheme: AuthScheme = auth_scheme_str.try_into().unwrap();
let auth_scheme_str = basic_auth("test", "test");
let scheme: AuthScheme = auth_scheme_str.as_str().try_into().unwrap();
assert_matches!(scheme, AuthScheme::Basic(username, pwd) if username == "test" && pwd.expose_secret() == "test");
let unsupported = "digest";
@@ -390,21 +389,37 @@ mod tests {
#[test]
fn test_auth_header() {
// base64encode("username:password") == "dXNlcm5hbWU6cGFzc3dvcmQ="
let req = mock_http_request(Some("Basic dXNlcm5hbWU6cGFzc3dvcmQ="), None).unwrap();
let header_value = basic_auth("username", "password");
let req = mock_http_request(Some(&header_value), None).unwrap();
let auth_scheme = auth_header(&req).unwrap();
assert_matches!(auth_scheme, AuthScheme::Basic(username, pwd) if username == "username" && pwd.expose_secret() == "password");
let wrong_req = mock_http_request(Some("Basic dXNlcm5hbWU6 cGFzc3dvcmQ="), None).unwrap();
let wrong_auth_header = header_value.replacen('c', "c ", 1);
let wrong_req = mock_http_request(Some(&wrong_auth_header), None).unwrap();
let res = auth_header(&wrong_req);
assert_matches!(res.err(), Some(error::Error::InvalidAuthHeader { .. }));
let wrong_req = mock_http_request(Some("Digest dXNlcm5hbWU6cGFzc3dvcmQ="), None).unwrap();
let wrong_req = mock_http_request(
Some(&format!(
"Digest {}",
basic_auth_credentials("username", "password")
)),
None,
)
.unwrap();
let res = auth_header(&wrong_req);
assert_matches!(res.err(), Some(error::Error::UnsupportedAuthScheme { .. }));
}
fn basic_auth(username: &str, password: &str) -> String {
format!("Basic {}", basic_auth_credentials(username, password))
}
fn basic_auth_credentials(username: &str, password: &str) -> String {
BASE64_STANDARD.encode(format!("{username}:{password}"))
}
fn mock_http_request(auth_header: Option<&str>, uri: Option<&str>) -> Result<Request<()>> {
let http_api_version = crate::http::HTTP_API_VERSION;
let mut req = Request::builder()

View File

@@ -17,14 +17,15 @@ use std::sync::Arc;
use auth::UserProvider;
use auth::tests::MockUserProvider;
use axum::http;
use base64::prelude::{BASE64_STANDARD, Engine as _};
use hyper::{Request, StatusCode};
use servers::http::AUTHORIZATION_HEADER;
use servers::http::authorize::inner_auth;
use session::context::QueryContext;
async fn check_http_auth(header_key: &str) {
// base64encode("username:password") == "dXNlcm5hbWU6cGFzc3dvcmQ="
let req = mock_http_request(header_key, Some("Basic dXNlcm5hbWU6cGFzc3dvcmQ="), None).unwrap();
let req =
mock_http_request(header_key, Some(&basic_auth("username", "password")), None).unwrap();
let req = inner_auth(None, req).await.unwrap();
let ctx: &QueryContext = req.extensions().get().unwrap();
let user_info = ctx.current_user();
@@ -34,8 +35,8 @@ async fn check_http_auth(header_key: &str) {
// In mock user provider, right username:password == "greptime:greptime"
let mock_user_provider = Some(Arc::new(MockUserProvider::default()) as Arc<dyn UserProvider>);
// base64encode("greptime:greptime") == "Z3JlcHRpbWU6Z3JlcHRpbWU="
let req = mock_http_request(header_key, Some("Basic Z3JlcHRpbWU6Z3JlcHRpbWU="), None).unwrap();
let req =
mock_http_request(header_key, Some(&basic_auth("greptime", "greptime")), None).unwrap();
let req = inner_auth(mock_user_provider.clone(), req).await.unwrap();
let ctx: &QueryContext = req.extensions().get().unwrap();
let user_info = ctx.current_user();
@@ -52,9 +53,8 @@ async fn check_http_auth(header_key: &str) {
axum::body::to_bytes(resp.into_body(), usize::MAX).await.unwrap().as_ref()
);
// base64encode("username:password") == "dXNlcm5hbWU6cGFzc3dvcmQ="
let wrong_req =
mock_http_request(header_key, Some("Basic dXNlcm5hbWU6cGFzc3dvcmQ="), None).unwrap();
mock_http_request(header_key, Some(&basic_auth("username", "password")), None).unwrap();
let auth_res = inner_auth(mock_user_provider, wrong_req).await;
assert!(auth_res.is_err());
let resp = auth_res.unwrap_err();
@@ -78,12 +78,11 @@ async fn check_schema_validating(header: &str) {
// In mock user provider, right username:password == "greptime:greptime"
let mock_user_provider = Some(Arc::new(MockUserProvider::default()) as Arc<dyn UserProvider>);
// base64encode("greptime:greptime") == "Z3JlcHRpbWU6Z3JlcHRpbWU="
// http://localhost/{http_api_version}/sql?db=greptime
let version = servers::http::HTTP_API_VERSION;
let req = mock_http_request(
header,
Some("Basic Z3JlcHRpbWU6Z3JlcHRpbWU="),
Some(&basic_auth("greptime", "greptime")),
Some(format!("http://localhost/{version}/sql?db=public").as_str()),
)
.unwrap();
@@ -96,7 +95,7 @@ async fn check_schema_validating(header: &str) {
// wrong database
let req = mock_http_request(
header,
Some("Basic Z3JlcHRpbWU6Z3JlcHRpbWU="),
Some(&basic_auth("greptime", "greptime")),
Some(format!("http://localhost/{version}/sql?db=wrong").as_str()),
)
.unwrap();
@@ -120,7 +119,6 @@ async fn check_auth_header(header_key: &str) {
// In mock user provider, right username:password == "greptime:greptime"
let mock_user_provider = Some(Arc::new(MockUserProvider::default()) as Arc<dyn UserProvider>);
// base64encode("greptime:greptime") == "Z3JlcHRpbWU6Z3JlcHRpbWU="
// try auth path first
let req = mock_http_request(header_key, None, None).unwrap();
let auth_res = inner_auth(mock_user_provider.clone(), req).await;
@@ -144,6 +142,13 @@ async fn test_whitelist_no_auth() {
check_auth_header(AUTHORIZATION_HEADER).await;
}
fn basic_auth(username: &str, password: &str) -> String {
format!(
"Basic {}",
BASE64_STANDARD.encode(format!("{username}:{password}"))
)
}
// copy from http::authorize
fn mock_http_request(
auth_header_key: &str,

View File

@@ -18,6 +18,7 @@ use api::v1::RowInsertRequests;
use async_trait::async_trait;
use auth::tests::{DatabaseAuthInfo, MockUserProvider};
use axum::{Router, http};
use base64::prelude::{BASE64_STANDARD, Engine as _};
use common_query::Output;
use common_test_util::ports;
use datafusion_expr::LogicalPlan;
@@ -34,6 +35,13 @@ use session::context::QueryContextRef;
use sql::statements::statement::Statement;
use tokio::sync::mpsc;
fn basic_auth(username: &str, password: &str) -> String {
format!(
"basic {}",
BASE64_STANDARD.encode(format!("{username}:{password}"))
)
}
struct DummyInstance {
tx: Arc<mpsc::Sender<(String, String)>>,
}
@@ -146,7 +154,7 @@ async fn test_influxdb_write() {
.body("monitor,host=host1 cpu=1.2 1664370459457010101")
.header(
http::header::AUTHORIZATION,
"basic Z3JlcHRpbWU6Z3JlcHRpbWU=",
basic_auth("greptime", "greptime"),
)
.send()
.await;

View File

@@ -23,6 +23,7 @@ async-stream.workspace = true
async-trait.workspace = true
auth.workspace = true
axum.workspace = true
base64.workspace = true
cache.workspace = true
catalog.workspace = true
chrono.workspace = true
@@ -64,6 +65,7 @@ meta-srv = { workspace = true, features = ["mock"] }
mito2.workspace = true
object-store.workspace = true
operator = { workspace = true, features = ["testing"] }
plugins.workspace = true
prost.workspace = true
query.workspace = true
rand.workspace = true

View File

@@ -22,6 +22,7 @@ use api::v1::{
column,
};
use auth::user_provider_from_option;
use base64::prelude::{BASE64_STANDARD, Engine as _};
use client::{Client, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, Database, OutputData};
use common_catalog::consts::MITO_ENGINE;
use common_grpc::channel_manager::ClientTlsOption;
@@ -336,7 +337,7 @@ pub async fn test_otel_arrow_auth(store_type: StorageType) {
let mut request = Request::new(stream);
request.metadata_mut().insert(
"authorization",
MetadataValue::from_static("Basic Z3JlcHRpbWVfdXNlcjpncmVwdGltZV9wd2Q="), // greptime_user:greptime_pwd base64 encoded
MetadataValue::try_from(basic_auth("greptime_user", "greptime_pwd")).unwrap(),
);
let response = client.arrow_metrics(request).await;
assert!(response.is_ok());
@@ -356,7 +357,8 @@ pub async fn test_otel_arrow_auth(store_type: StorageType) {
let mut request = Request::new(stream);
request.metadata_mut().insert(
"authorization",
MetadataValue::from_static("Z3JlcHRpbWVfdXNlcjpncmVwdGltZV9wd2Q="), // greptime_user:greptime_pwd base64 encoded
MetadataValue::try_from(basic_auth_credentials("greptime_user", "greptime_pwd"))
.unwrap(),
);
let response = client.arrow_metrics(request).await;
assert!(response.is_ok());
@@ -374,6 +376,14 @@ pub async fn test_otel_arrow_auth(store_type: StorageType) {
let _ = fe_grpc_server.shutdown().await;
}
fn basic_auth(username: &str, password: &str) -> String {
format!("Basic {}", basic_auth_credentials(username, password))
}
fn basic_auth_credentials(username: &str, password: &str) -> String {
BASE64_STANDARD.encode(format!("{username}:{password}"))
}
pub async fn test_auto_create_table(store_type: StorageType) {
let (_db, fe_grpc_server) = setup_grpc_server(store_type, "test_auto_create_table").await;
let addr = fe_grpc_server.bind_addr().unwrap().to_string();

View File

@@ -21,13 +21,17 @@ use api::prom_store::remote::label_matcher::Type as MatcherType;
use api::prom_store::remote::{
Label, LabelMatcher, Query, ReadRequest, ReadResponse, Sample, TimeSeries, WriteRequest,
};
use auth::user_provider_from_option;
use auth::{UserProviderRef, user_provider_from_option};
use axum::http::{HeaderName, HeaderValue, StatusCode};
use base64::prelude::{BASE64_STANDARD, Engine as _};
use chrono::Utc;
use cmd::options::GreptimeOptions;
use common_base::Plugins;
use common_catalog::consts::{
DEFAULT_PRIVATE_SCHEMA_NAME, TRACE_TABLE_NAME, trace_operations_table_name,
trace_services_table_name,
};
use common_config::Configurable;
use common_error::status_code::StatusCode as ErrorCode;
use common_frontend::slow_query_event::{
SLOW_QUERY_TABLE_NAME, SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
@@ -60,6 +64,7 @@ use servers::http::result::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Respon
use servers::http::test_helpers::{TestClient, TestResponse};
use servers::prom_store::{self, mock_timeseries_new_label};
use servers::request_memory_limiter::ServerMemoryLimiter;
use standalone::options::StandaloneOptions;
use table::table_name::TableName;
use tests_integration::test_util::{
StorageType, setup_test_http_app, setup_test_http_app_with_frontend,
@@ -186,7 +191,7 @@ pub async fn test_http_auth(store_type: StorageType) {
// 2. wrong auth
let res = client
.get("/v1/sql?db=public&sql=show tables;")
.header("Authorization", "basic Z3JlcHRpbWVfdXNlcjp3cm9uZ19wd2Q=")
.header("Authorization", basic_auth("greptime_user", "wrong_pwd"))
.send()
.await;
assert_eq!(res.status(), StatusCode::UNAUTHORIZED);
@@ -194,10 +199,7 @@ pub async fn test_http_auth(store_type: StorageType) {
// 3. right auth
let res = client
.get("/v1/sql?db=public&sql=show tables;")
.header(
"Authorization",
"basic Z3JlcHRpbWVfdXNlcjpncmVwdGltZV9wd2Q=",
)
.header("Authorization", basic_auth("greptime_user", "greptime_pwd"))
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
@@ -205,19 +207,13 @@ pub async fn test_http_auth(store_type: StorageType) {
// 4. readonly user cannot write
let res = client
.get("/v1/sql?db=public&sql=show tables;")
.header(
"Authorization",
"basic cmVhZG9ubHlfdXNlcjpyZWFkb25seV9wd2Q=",
)
.header("Authorization", basic_auth("readonly_user", "readonly_pwd"))
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let res = client
.get("/v1/sql?db=public&sql=create table auth_test(ts timestamp time index);")
.header(
"Authorization",
"basic cmVhZG9ubHlfdXNlcjpyZWFkb25seV9wd2Q=",
)
.header("Authorization", basic_auth("readonly_user", "readonly_pwd"))
.send()
.await;
assert_eq!(res.status(), StatusCode::FORBIDDEN);
@@ -227,7 +223,7 @@ pub async fn test_http_auth(store_type: StorageType) {
.get("/v1/sql?db=public&sql=show tables;")
.header(
"Authorization",
"basic d3JpdGVvbmx5X3VzZXI6d3JpdGVvbmx5X3B3ZA==",
basic_auth("writeonly_user", "writeonly_pwd"),
)
.send()
.await;
@@ -236,7 +232,7 @@ pub async fn test_http_auth(store_type: StorageType) {
.get("/v1/sql?db=public&sql=create table auth_test(ts timestamp time index);")
.header(
"Authorization",
"basic d3JpdGVvbmx5X3VzZXI6d3JpdGVvbmx5X3B3ZA==",
basic_auth("writeonly_user", "writeonly_pwd"),
)
.send()
.await;
@@ -245,7 +241,7 @@ pub async fn test_http_auth(store_type: StorageType) {
.get("/v1/sql?db=public&sql=insert into auth_test values(1);")
.header(
"Authorization",
"basic d3JpdGVvbmx5X3VzZXI6d3JpdGVvbmx5X3B3ZA==",
basic_auth("writeonly_user", "writeonly_pwd"),
)
.send()
.await;
@@ -254,7 +250,7 @@ pub async fn test_http_auth(store_type: StorageType) {
.get("/v1/sql?db=public&sql=select * from auth_test;")
.header(
"Authorization",
"basic d3JpdGVvbmx5X3VzZXI6d3JpdGVvbmx5X3B3ZA==",
basic_auth("writeonly_user", "writeonly_pwd"),
)
.send()
.await;
@@ -263,6 +259,65 @@ pub async fn test_http_auth(store_type: StorageType) {
guard.remove_all().await;
}
fn basic_auth(username: &str, password: &str) -> String {
format!(
"basic {}",
BASE64_STANDARD.encode(format!("{username}:{password}"))
)
}
pub async fn test_http_auth_from_standalone_user_provider_config() {
common_telemetry::init_default_ut_logging();
let config = tempfile::NamedTempFile::new().unwrap();
let user_provider = "static_user_provider:cmd:greptime_user=greptime_pwd";
std::fs::write(
config.path(),
format!("user_provider = \"{user_provider}\"\n"),
)
.unwrap();
let options =
GreptimeOptions::<StandaloneOptions>::load_layered_options(config.path().to_str(), "")
.unwrap();
let fe_opts = options.component.frontend_options();
let mut plugins = Plugins::new();
plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
.await
.unwrap();
let user_provider = plugins.get::<UserProviderRef>();
let (app, mut guard) = setup_test_http_app_with_frontend_and_user_provider(
StorageType::File,
"sql_api_user_provider_config",
user_provider,
)
.await;
let client = TestClient::new(app).await;
let res = client
.get("/v1/sql?db=public&sql=show tables;")
.send()
.await;
let status = res.status();
let body = res.text().await;
assert_eq!(
status,
StatusCode::UNAUTHORIZED,
"unexpected response body: {body}"
);
let res = client
.get("/v1/sql?db=public&sql=show tables;")
.header("Authorization", basic_auth("greptime_user", "greptime_pwd"))
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
guard.remove_all().await;
}
#[tokio::test]
pub async fn test_cors() {
let (app, mut guard) = setup_test_http_app_with_frontend(StorageType::File, "test_cors").await;

View File

@@ -33,6 +33,11 @@ grpc_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs);
http_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs);
#[tokio::test(flavor = "multi_thread")]
async fn test_http_auth_from_standalone_user_provider_config() {
http::test_http_auth_from_standalone_user_provider_config().await;
}
sql_tests!(File);
region_migration_tests!(File);