Compare commits

..

6 Commits

Author SHA1 Message Date
Nikita Kalyanov
e29fbb96b5 idempotency 2023-11-17 08:56:03 +01:00
Nikita Kalyanov
90357fbc45 match more precise 2023-11-16 21:24:13 +01:00
Nikita Kalyanov
8987de089c fix query usage 2023-11-16 21:04:27 +01:00
Nikita Kalyanov
c61fca9a5f fmt 2023-11-16 18:52:55 +01:00
Nikita Kalyanov
c83722f348 API 2023-11-16 17:09:03 +01:00
khanova
6b82f22ada Collect number of connections by sni type (#5867)
## Problem

We don't know the number of users with the different kind of
authentication: ["sni", "endpoint in options" (A and B from
[here](https://neon.tech/docs/connect/connection-errors)),
"password_hack"]

## Summary of changes

Collect metrics by sni kind.
2023-11-16 12:19:13 +00:00
12 changed files with 98 additions and 448 deletions

View File

@@ -83,7 +83,6 @@ pub struct ComputeState {
pub last_active: Option<DateTime<Utc>>,
pub error: Option<String>,
pub pspec: Option<ParsedSpec>,
pub merge_src_connstr: Option<String>,
pub metrics: ComputeMetrics,
}
@@ -95,7 +94,6 @@ impl ComputeState {
last_active: None,
error: None,
pspec: None,
merge_src_connstr: None,
metrics: ComputeMetrics::default(),
}
}
@@ -117,6 +115,16 @@ pub struct ParsedSpec {
pub storage_auth_token: Option<String>,
}
#[derive(Clone, Debug, serde::Deserialize)]
pub struct RowLevelParams {
pub table_name: String,
pub role: String,
pub user_name: String,
pub password: String,
pub database_name: String,
pub column_name: String,
}
impl TryFrom<ComputeSpec> for ParsedSpec {
type Error = String;
fn try_from(spec: ComputeSpec) -> Result<Self, String> {
@@ -629,22 +637,6 @@ impl ComputeNode {
Ok(())
}
/// Merge two branches
#[instrument(skip_all)]
pub fn merge(&self, src_connstr: &str) -> Result<()> {
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
handle_merge(&mut client, self.connstr.as_str(), &src_connstr)?;
Ok(())
}
/// Mark brnach as mergeable
#[instrument(skip_all)]
pub fn set_mergeable(&self) -> Result<()> {
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
handle_set_mergeable(&mut client, self.connstr.as_str())?;
Ok(())
}
/// Start Postgres as a child process and manage DBs/roles.
/// After that this will hang waiting on the postmaster process to exit.
#[instrument(skip_all)]
@@ -1048,6 +1040,39 @@ LIMIT 100",
download_size
}
pub async fn ensure_row_level_sec(&self, params: RowLevelParams) -> Result<bool> {
let conn_str = self
.connstr
.as_str()
.replace("/postgres", &format!("/{}", params.database_name));
let connect_result = tokio_postgres::connect(&conn_str, NoTls).await;
let (client, connection) = connect_result.unwrap();
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
let result = client
.batch_execute(&format!(
"BEGIN;
ALTER TABLE {0} ENABLE ROW LEVEL SECURITY;
DROP POLICY IF EXISTS neon_row_level ON {0};
DROP ROLE IF EXISTS {1};
CREATE USER {1} WITH PASSWORD '{2}' IN GROUP {3};
CREATE POLICY neon_row_level ON {0} TO {3}
USING ({4} = current_user)
WITH CHECK ({4} = current_user);
COMMIT;",
&params.table_name,
&params.user_name,
&params.password,
&params.role,
&params.column_name,
))
.await;
Ok(result.is_ok())
}
#[tokio::main]
pub async fn prepare_preload_libraries(
&self,

View File

@@ -31,35 +31,6 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
// XXX: used to test that API is blocking
// std::thread::sleep(std::time::Duration::from_millis(10000));
compute.set_status(new_status);
} else if state.status == ComputeStatus::MergePending {
info!("got merge request");
state.status = ComputeStatus::Merging;
compute.state_changed.notify_all();
let connstr = state.merge_src_connstr.clone().unwrap();
drop(state);
let mut new_status = ComputeStatus::Failed;
if let Err(e) = compute.merge(&connstr) {
info!("could not merge compute node: {}", e);
} else {
new_status = ComputeStatus::Running;
info!("merge complete");
}
compute.set_status(new_status);
} else if state.status == ComputeStatus::SetMergeablePending {
info!("got set mergeable request");
state.status = ComputeStatus::SetMergeable;
compute.state_changed.notify_all();
drop(state);
let mut new_status = ComputeStatus::Failed;
if let Err(e) = compute.set_mergeable() {
info!("could not mark branch as mergeable: {}", e);
} else {
new_status = ComputeStatus::Running;
info!("marked as mergeable");
}
compute.set_status(new_status);
} else if state.status == ComputeStatus::Failed {
info!("compute node is now in Failed state, exiting");

View File

@@ -5,7 +5,7 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::thread;
use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
use crate::compute::{ComputeNode, ComputeState, ParsedSpec, RowLevelParams};
use compute_api::requests::ConfigurationRequest;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
@@ -123,30 +123,6 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}
// Handle branch merge request
(&Method::POST, "/merge") => {
info!("serving /merge POST request");
match handle_merge_request(req, compute).await {
Ok(msg) => Response::new(Body::from(msg)),
Err((msg, code)) => {
error!("error handling /merge request: {msg}");
render_json_error(&msg, code)
}
}
}
// Handle branch set mergeable request
(&Method::POST, "/set_mergeable") => {
info!("serving /set_mergeable POST request");
match handle_set_mergeable_request(compute).await {
Ok(msg) => Response::new(Body::from(msg)),
Err((msg, code)) => {
error!("error handling /set_mergeable request: {msg}");
render_json_error(&msg, code)
}
}
}
// download extension files from S3 on demand
(&Method::POST, route) if route.starts_with("/extension_server/") => {
info!("serving {:?} POST request", route);
@@ -223,6 +199,31 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}
}
(&Method::POST, "/ensure_row_level_sec") => {
info!("serving /ensure_row_level_sec GET request");
let status = compute.get_status();
if status != ComputeStatus::Running {
let msg = format!("compute is not running, current status: {:?}", status);
error!(msg);
let mut err_resp = Response::new(Body::from(msg));
*err_resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
return err_resp;
}
let body_bytes: Vec<u8> = hyper::body::to_bytes(req.into_body()).await.unwrap().into();
let params: RowLevelParams =
serde_json::from_str(&String::from_utf8(body_bytes).unwrap()).unwrap();
let res = compute.ensure_row_level_sec(params).await;
match res {
Ok(true) => (),
_ => {
let mut err_resp = Response::new(Body::from("query failed"));
*err_resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
return err_resp;
}
}
Response::new(Body::from(""))
}
// Return the `404 Not Found` for any other routes.
_ => {
@@ -233,103 +234,6 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}
async fn handle_merge_request(
req: Request<Body>,
compute: &Arc<ComputeNode>,
) -> Result<String, (String, StatusCode)> {
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
let connstr = String::from_utf8(body_bytes.to_vec()).unwrap();
let c = compute.clone();
{
let mut state = compute.state.lock().unwrap();
if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
let msg = format!(
"invalid compute status for merge request: {:?}",
state.status.clone()
);
return Err((msg, StatusCode::PRECONDITION_FAILED));
}
state.merge_src_connstr = Some(connstr);
state.status = ComputeStatus::MergePending;
compute.state_changed.notify_all();
drop(state);
info!("set new spec and notified waiters");
}
task::spawn_blocking(move || {
let mut state = c.state.lock().unwrap();
while state.status != ComputeStatus::Running {
state = c.state_changed.wait(state).unwrap();
info!(
"waiting for compute to become Running, current status: {:?}",
state.status
);
if state.status == ComputeStatus::Failed {
let err = state.error.as_ref().map_or("unknown error", |x| x);
let msg = format!("compute configuration failed: {:?}", err);
return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
}
}
Ok(())
})
.await
.unwrap()?;
let state = compute.state.lock().unwrap().clone();
let status_response = status_response_from_state(&state);
Ok(serde_json::to_string(&status_response).unwrap())
}
async fn handle_set_mergeable_request(
compute: &Arc<ComputeNode>,
) -> Result<String, (String, StatusCode)> {
let c = compute.clone();
{
let mut state = compute.state.lock().unwrap();
if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
let msg = format!(
"invalid compute status for merge request: {:?}",
state.status.clone()
);
return Err((msg, StatusCode::PRECONDITION_FAILED));
}
state.status = ComputeStatus::SetMergeablePending;
compute.state_changed.notify_all();
drop(state);
info!("set new spec and notified waiters");
}
task::spawn_blocking(move || {
let mut state = c.state.lock().unwrap();
while state.status != ComputeStatus::Running {
state = c.state_changed.wait(state).unwrap();
info!(
"waiting for compute to become Running, current status: {:?}",
state.status
);
if state.status == ComputeStatus::Failed {
let err = state.error.as_ref().map_or("unknown error", |x| x);
let msg = format!("compute configuration failed: {:?}", err);
return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
}
}
Ok(())
})
.await
.unwrap()?;
let state = compute.state.lock().unwrap().clone();
let status_response = status_response_from_state(&state);
Ok(serde_json::to_string(&status_response).unwrap())
}
async fn handle_configure_request(
req: Request<Body>,
compute: &Arc<ComputeNode>,

View File

@@ -85,61 +85,6 @@ paths:
description: Error text or 'true' if check passed.
example: "true"
/merge:
post:
tags:
- Merge
summary: Merge branches.
description: |
This is a blocking API endpoint, i.e. it blocks waiting until
compute is finished configuration and is in `Running` state.
Optional non-blocking mode could be added later.
operationId: mergeBranches
requestBody:
description: connection string of target branch
required: true
content:
text/plain:
schema:
type: string
responses:
200:
description: Merge finished.
content:
application/json:OK
schema:
$ref: "#/components/schemas/ComputeState"
500:
description: Merge request failed.
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
/set_mergeable:
post:
tags:
- Set mergeable
summary: Mark branch as mergeable.
description: |
This is a blocking API endpoint, i.e. it blocks waiting until
compute is finished configuration and is in `Running` state.
Optional non-blocking mode could be added later.
operationId: markBranchMergeable
responses:
200:
description: Branch marked as mergeable
content:
application/json:OK
schema:
$ref: "#/components/schemas/ComputeState"
500:
description: Set mergeable request failed.
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
/configure:
post:
tags:

View File

@@ -407,58 +407,6 @@ fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent
Ok(())
}
#[instrument(skip_all)]
pub fn handle_merge(client: &mut Client, dst_connstr: &str, src_connstr: &str) -> Result<()> {
info!("Merge branch into {}", dst_connstr);
let existing_dbs = get_existing_dbs(client)?;
for (_, db) in existing_dbs {
if db.name.starts_with("template") {
continue;
}
let mut dst_conf = Config::from_str(dst_connstr)?;
dst_conf.dbname(&db.name);
let mut src_conf = Config::from_str(src_connstr)?;
src_conf.dbname(&db.name);
let mut sub_client = dst_conf.connect(NoTls)?;
let mut connstr_parts: Vec<&str> = src_connstr.split('/').collect();
connstr_parts.pop();
connstr_parts.push(&db.name);
let connstr = connstr_parts.join("/");
let create_sub = format!("create subscription sub_merge connection '{}' publication pub_merge with (create_slot=false, slot_name=merge_slot_{}, copy_data=false)", connstr, &db.name);
sub_client.simple_query(&create_sub)?;
}
Ok(())
}
#[instrument(skip_all)]
pub fn handle_set_mergeable(client: &mut Client, connstr: &str) -> Result<()> {
info!("Mark branch as mergeable");
let existing_dbs = get_existing_dbs(client)?;
for (_, db) in existing_dbs {
if db.name.starts_with("template") {
continue;
}
let mut conf = Config::from_str(connstr)?;
conf.dbname(&db.name);
let mut db_client = conf.connect(NoTls)?;
let create_slot = format!(
"select pg_create_logical_replication_slot('merge_slot_{}', 'pgoutput')",
&db.name
);
db_client.simple_query(&create_slot)?;
db_client.simple_query("create publication pub_merge for all tables")?;
}
Ok(())
}
/// It follows mostly the same logic as `handle_roles()` excepting that we
/// does not use an explicit transactions block, since major database operations
/// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level

View File

@@ -587,31 +587,6 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
timeline_info.timeline_id
);
}
Some(("merge", branch_match)) => {
let src_endpoint_id = branch_match
.get_one::<String>("src-endpoint")
.map(|s| s.as_str())
.ok_or(anyhow!("No source endpoint provided"))?;
let dst_endpoint_id = branch_match
.get_one::<String>("dst-endpoint")
.map(|s| s.as_str())
.ok_or(anyhow!("No destination endpoint provided"))?;
let cplane = ComputeControlPlane::load(env.clone())?;
let src_endpoint = cplane.endpoints.get(src_endpoint_id).unwrap();
let dst_endpoint = cplane.endpoints.get(dst_endpoint_id).unwrap();
dst_endpoint.merge_from(src_endpoint)?;
}
Some(("set_mergeable", branch_match)) => {
let endpoint_id = branch_match
.get_one::<String>("endpoint")
.map(|s| s.as_str())
.ok_or(anyhow!("No endpoint provided"))?;
let cplane = ComputeControlPlane::load(env.clone())?;
let endpoint = cplane.endpoints.get(endpoint_id).unwrap();
endpoint.set_mergeable()?;
}
Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{sub_name}'"),
None => bail!("no tenant subcommand provided"),
}
@@ -1330,15 +1305,6 @@ fn cli() -> Command {
.help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false))
.arg(Arg::new("ancestor-start-lsn").long("ancestor-start-lsn")
.help("When using another timeline as base, use a specific Lsn in it instead of the latest one").required(false)))
.subcommand(Command::new("merge")
.about("Merge changes from one branch into another")
.arg(Arg::new("src-endpoint").long("src-endpoint").help("Source endpoint for merge").required(true))
.arg(Arg::new("dst-endpoint").long("dst-endpoint").help("Destination endpoint for merge").required(true))
)
.subcommand(Command::new("set_mergeable")
.about("Mark branch as mergeable")
.arg(Arg::new("endpoint").long("endpoint").help("Enpoint to be marked as mergeable").required(true))
)
.subcommand(Command::new("create")
.about("Create a new blank timeline")
.arg(tenant_id_arg.clone())

View File

@@ -572,11 +572,7 @@ impl Endpoint {
}
ComputeStatus::Empty
| ComputeStatus::ConfigurationPending
| ComputeStatus::Configuration
| ComputeStatus::MergePending
| ComputeStatus::Merging
| ComputeStatus::SetMergeablePending
| ComputeStatus::SetMergeable => {
| ComputeStatus::Configuration => {
bail!("unexpected compute status: {:?}", state.status)
}
}
@@ -678,51 +674,6 @@ impl Endpoint {
}
}
pub fn merge_from(&self, merge_from: &Arc<Endpoint>) -> Result<()> {
let client = reqwest::blocking::Client::new();
let response = client
.post(format!(
"http://{}:{}/merge",
self.http_address.ip(),
self.http_address.port()
))
.body(merge_from.connstr())
.send()?;
let status = response.status();
if !(status.is_client_error() || status.is_server_error()) {
Ok(())
} else {
let url = response.url().to_owned();
let msg = match response.text() {
Ok(err_body) => format!("Error: {}", err_body),
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
};
Err(anyhow::anyhow!(msg))
}
}
pub fn set_mergeable(&self) -> Result<()> {
let client = reqwest::blocking::Client::new();
let response = client
.post(format!(
"http://{}:{}/set_mergeable",
self.http_address.ip(),
self.http_address.port()
))
.send()?;
let status = response.status();
if !(status.is_client_error() || status.is_server_error()) {
Ok(())
} else {
let url = response.url().to_owned();
let msg = match response.text() {
Ok(err_body) => format!("Error: {}", err_body),
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
};
Err(anyhow::anyhow!(msg))
}
}
pub fn stop(&self, destroy: bool) -> Result<()> {
// If we are going to destroy data directory,
// use immediate shutdown mode, otherwise,

View File

@@ -48,14 +48,6 @@ pub enum ComputeStatus {
Running,
// New spec is being applied.
Configuration,
// Merge requested
MergePending,
// Set mergeable requested
SetMergeablePending,
// Merge in progress
Merging,
// Set mergeable in progress
SetMergeable,
// Either startup or configuration failed,
// compute will exit soon or is waiting for
// control-plane to terminate it.

View File

@@ -1,7 +1,9 @@
//! User credentials used in authentication.
use crate::{
auth::password_hack::parse_endpoint_param, error::UserFacingError, proxy::neon_options,
auth::password_hack::parse_endpoint_param,
error::UserFacingError,
proxy::{neon_options, NUM_CONNECTION_ACCEPTED_BY_SNI},
};
use itertools::Itertools;
use pq_proto::StartupMessageParams;
@@ -124,6 +126,22 @@ impl<'a> ClientCredentials<'a> {
.transpose()?;
info!(user, project = project.as_deref(), "credentials");
if sni.is_some() {
info!("Connection with sni");
NUM_CONNECTION_ACCEPTED_BY_SNI
.with_label_values(&["sni"])
.inc();
} else if project.is_some() {
NUM_CONNECTION_ACCEPTED_BY_SNI
.with_label_values(&["no_sni"])
.inc();
info!("Connection without sni");
} else {
NUM_CONNECTION_ACCEPTED_BY_SNI
.with_label_values(&["password_hack"])
.inc();
info!("Connection with password hack");
}
let cache_key = format!(
"{}{}",

View File

@@ -129,6 +129,15 @@ pub static RATE_LIMITER_LIMIT: Lazy<IntGaugeVec> = Lazy::new(|| {
.unwrap()
});
pub static NUM_CONNECTION_ACCEPTED_BY_SNI: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_accepted_connections_by_sni",
"Number of connections (per sni).",
&["kind"],
)
.unwrap()
});
pub struct LatencyTimer {
// time since the stopwatch was started
start: Option<Instant>,

View File

@@ -1154,42 +1154,6 @@ class NeonCli(AbstractNeonCli):
kwargs["local_binpath"] = True
return super().raw_cli(*args, **kwargs)
def merge(
self,
src_endpoint: Endpoint,
dst_endpoint: Endpoint):
"""
Merge two branches
"""
args = [
"timeline",
"merge",
"--src-endpoint",
str(src_endpoint.endpoint_id),
"--dst-endpoint",
str(dst_endpoint.endpoint_id)
]
res = self.raw_cli(args)
res.check_returncode()
def set_mergeable(
self,
endpoint: Endpoint):
"""
Merge two branches
"""
args = [
"timeline",
"set_mergeable",
"--endpoint",
str(endpoint.endpoint_id),
]
res = self.raw_cli(args)
res.check_returncode()
def create_tenant(
self,
tenant_id: Optional[TenantId] = None,

View File

@@ -1,43 +0,0 @@
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.types import TimelineId
from fixtures.utils import query_scalar
#
# Merge ancestor branch with the main branch.
#
def test_merge(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
# Override defaults: 4M checkpoint_distance, disable background compaction and gc.
tenant, _ = env.neon_cli.create_tenant()
main_branch = env.endpoints.create_start("main", tenant_id=tenant)
main_cur = main_branch.connect().cursor()
# Create table and insert some data
main_cur.execute("CREATE TABLE t(x bigint primary key)")
main_cur.execute("INSERT INTO t values(generate_series(1, 10000))");
# Create branch ws.
env.neon_cli.create_branch("ws", "main", tenant_id=tenant)
ws_branch = env.endpoints.create_start("ws", tenant_id=tenant)
log.info("postgres is running on 'ws' branch")
# Merge brnach ws as mergeable:it create logical replication slots and pins WAL
env.neon_cli.set_mergeable(ws_branch)
# Insert more data in the branch
ws_cur = ws_branch.connect().cursor()
ws_cur.execute("INSERT INTO t values(generate_series(10001, 20000))")
# Merge ws brnach intp main
env.neon_cli.merge(ws_branch, main_branch)
# sleep for some time until changes are applied
time.sleep(2)
# Check that changes are merged
assert query_scalar(main_cur, "SELECT count(*) from t") == 20000