Compare commits

...

8 Commits

Author SHA1 Message Date
Conrad Ludgate
dc8ca6aaa1 fix dbname 2024-10-29 07:55:14 +00:00
Conrad Ludgate
af50fd76b7 fix user 2024-10-29 07:22:07 +00:00
Conrad Ludgate
da16233f64 fixup 2024-10-28 18:41:07 +00:00
Conrad Ludgate
80466bdca2 remove postgres auth backend from proxy tests 2024-10-28 18:29:45 +00:00
Erik Grinaker
248558dee8 safekeeper: refactor WalAcceptor to be event-driven (#9462)
## Problem

The `WalAcceptor` main loop currently uses two nested loops to consume
inbound messages. This makes it hard to slot in periodic events like
metrics collection. It also duplicates the event processing code, and assumes
all messages in steady state are AppendRequests (other messages types may
be dropped if following an AppendRequest).

## Summary of changes

Refactor the `WalAcceptor` loop to be event driven.
2024-10-28 17:18:37 +00:00
Sergey Melnikov
3bad52543f We don't have legacy proxies anymore (#9544)
We don't have legacy scram proxies anymore:
cc: https://github.com/neondatabase/cloud/issues/9745
2024-10-28 16:42:35 +00:00
Tristan Partin
3d64a7ddcd Add pg_mooncake to compute-node.Dockerfile
Signed-off-by: Tristan Partin <tristan@neon.tech>
2024-10-28 11:23:30 -05:00
Conrad Ludgate
25f1e5cfeb [proxy] demote warnings and remove dead-argument (#9512)
fixes https://github.com/neondatabase/cloud/issues/19000
2024-10-28 15:02:20 +00:00
7 changed files with 216 additions and 124 deletions

View File

@@ -839,6 +839,7 @@ jobs:
- name: Build vm image
run: |
./vm-builder \
-size=2G \
-spec=compute/vm-image-spec-${{ matrix.version.debian }}.yaml \
-src=neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }} \
-dst=neondatabase/vm-compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}
@@ -1118,7 +1119,6 @@ jobs:
-f deployPgSniRouter=true \
-f deployProxyLink=true \
-f deployPrivatelinkProxy=true \
-f deployLegacyProxyScram=true \
-f deployProxyScram=true \
-f deployProxyAuthBroker=true \
-f branch=main \

View File

@@ -666,7 +666,7 @@ RUN apt-get update && \
#
# Use new version only for v17
# because Release_2024_09_1 has some backward incompatible changes
# https://github.com/rdkit/rdkit/releases/tag/Release_2024_09_1
# https://github.com/rdkit/rdkit/releases/tag/Release_2024_09_1
ENV PATH="/usr/local/pgsql/bin/:/usr/local/pgsql/:$PATH"
RUN case "${PG_VERSION}" in \
"v17") \
@@ -860,13 +860,14 @@ ENV PATH="/home/nonroot/.cargo/bin:/usr/local/pgsql/bin/:$PATH"
USER nonroot
WORKDIR /home/nonroot
RUN case "${PG_VERSION}" in "v17") \
echo "v17 is not supported yet by pgrx. Quit" && exit 0;; \
esac && \
curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && \
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && \
chmod +x rustup-init && \
./rustup-init -y --no-modify-path --profile minimal --default-toolchain stable && \
rm rustup-init && \
case "${PG_VERSION}" in \
'v17') \
echo 'v17 is not supported yet by pgrx. Quit' && exit 0;; \
esac && \
cargo install --locked --version 0.11.3 cargo-pgrx && \
/bin/bash -c 'cargo pgrx init --pg${PG_VERSION:1}=/usr/local/pgsql/bin/pg_config'
@@ -1041,6 +1042,31 @@ RUN wget https://github.com/pgpartman/pg_partman/archive/refs/tags/v5.1.0.tar.gz
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_partman.control
#########################################################################################
#
# Layer "pg_mooncake"
# compile pg_mooncake extension
#
#########################################################################################
FROM rust-extensions-build AS pg-mooncake-build
ARG PG_VERSION
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ENV PG_MOONCAKE_VERSION=0a7de4c0b5c7b1a5e2175e1c5f4625b97b7346f1
ENV PATH="/usr/local/pgsql/bin/:$PATH"
RUN case "${PG_VERSION}" in \
'v14') \
echo "pg_mooncake is not supported on Postgres ${PG_VERSION}" && exit 0;; \
esac && \
git clone --depth 1 --branch neon https://github.com/Mooncake-Labs/pg_mooncake.git pg_mooncake-src && \
cd pg_mooncake-src && \
git checkout "${PG_MOONCAKE_VERSION}" && \
git submodule update --init --depth 1 --recursive && \
make BUILD_TYPE=release -j $(getconf _NPROCESSORS_ONLN) && \
make BUILD_TYPE=release -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_mooncake.control
#########################################################################################
#
# Layer "neon-pg-ext-build"
@@ -1084,6 +1110,7 @@ COPY --from=wal2json-pg-build /usr/local/pgsql /usr/local/pgsql
COPY --from=pg-anon-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-ivm-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-partman-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-mooncake-build /usr/local/pgsql/ /usr/local/pgsql/
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \

View File

@@ -1,5 +1,5 @@
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, warn};
use tracing::{debug, info};
use super::{ComputeCredentials, ComputeUserInfo, ComputeUserInfoNoEndpoint};
use crate::auth::{self, AuthFlow};
@@ -21,7 +21,7 @@ pub(crate) async fn authenticate_cleartext(
secret: AuthSecret,
config: &'static AuthenticationConfig,
) -> auth::Result<ComputeCredentials> {
warn!("cleartext auth flow override is enabled, proceeding");
debug!("cleartext auth flow override is enabled, proceeding");
ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
// pause the timer while we communicate with the client
@@ -61,7 +61,7 @@ pub(crate) async fn password_hack_no_authentication(
info: ComputeUserInfoNoEndpoint,
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
) -> auth::Result<(ComputeUserInfo, Vec<u8>)> {
warn!("project not specified, resorting to the password hack auth flow");
debug!("project not specified, resorting to the password hack auth flow");
ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
// pause the timer while we communicate with the client

View File

@@ -137,9 +137,6 @@ struct ProxyCliArgs {
/// size of the threadpool for password hashing
#[clap(long, default_value_t = 4)]
scram_thread_pool_size: u8,
/// Disable dynamic rate limiter and store the metrics to ensure its production behaviour.
#[clap(long, default_value_t = true, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
disable_dynamic_rate_limiter: bool,
/// Endpoint rate limiter max number of requests per second.
///
/// Provided in the form `<Requests Per Second>@<Bucket Duration Size>`.
@@ -615,9 +612,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
and metric-collection-interval must be specified"
),
};
if !args.disable_dynamic_rate_limiter {
bail!("dynamic rate limiter should be disabled");
}
let config::ConcurrencyLockOptions {
shards,

View File

@@ -21,18 +21,15 @@ use postgres_backend::QueryError;
use pq_proto::BeMessage;
use serde::Deserialize;
use serde::Serialize;
use std::future;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::sync::mpsc::channel;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::task;
use tokio::task::JoinHandle;
use tokio::time::Duration;
use tokio::time::Instant;
use tokio::time::{Duration, MissedTickBehavior};
use tracing::*;
use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
@@ -444,9 +441,9 @@ async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
}
}
// Send keepalive messages to walproposer, to make sure it receives updates
// even when it writes a steady stream of messages.
const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
/// The WAL flush interval. This ensures we periodically flush the WAL and send AppendResponses to
/// walproposer, even when it's writing a steady stream of messages.
const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
/// Encapsulates a task which takes messages from msg_rx, processes and pushes
/// replies to reply_tx.
@@ -494,67 +491,76 @@ impl WalAcceptor {
async fn run(&mut self) -> anyhow::Result<()> {
let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
// After this timestamp we will stop processing AppendRequests and send a response
// to the walproposer. walproposer sends at least one AppendRequest per second,
// we will send keepalives by replying to these requests once per second.
let mut next_keepalive = Instant::now();
// Periodically flush the WAL.
let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
flush_ticker.tick().await; // skip the initial, immediate tick
while let Some(mut next_msg) = self.msg_rx.recv().await {
// Update walreceiver state in shmem for reporting.
if let ProposerAcceptorMessage::Elected(_) = &next_msg {
walreceiver_guard.get().status = WalReceiverStatus::Streaming;
}
// Tracks unflushed appends.
let mut dirty = false;
let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
// Loop through AppendRequests while available to write as many WAL records as
// possible without fsyncing.
//
// Make sure the WAL is flushed before returning, see:
// https://github.com/neondatabase/neon/issues/9259
//
// Note: this will need to be rewritten if we want to read non-AppendRequest messages here.
// Otherwise, we might end up in a situation where we read a message, but don't
// process it.
while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg {
let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
if let Some(reply) = self.tli.process_msg(&noflush_msg).await? {
if self.reply_tx.send(reply).await.is_err() {
break; // disconnected, flush WAL and return on next send/recv
}
}
// get out of this loop if keepalive time is reached
if Instant::now() >= next_keepalive {
loop {
let reply = tokio::select! {
// Process inbound message.
msg = self.msg_rx.recv() => {
// If disconnected, break to flush WAL and return.
let Some(mut msg) = msg else {
break;
};
// Update walreceiver state in shmem for reporting.
if let ProposerAcceptorMessage::Elected(_) = &msg {
walreceiver_guard.get().status = WalReceiverStatus::Streaming;
}
// continue pulling AppendRequests if available
match self.msg_rx.try_recv() {
Ok(msg) => next_msg = msg,
Err(TryRecvError::Empty) => break,
// on disconnect, flush WAL and return on next send/recv
Err(TryRecvError::Disconnected) => break,
};
// Don't flush the WAL on every append, only periodically via flush_ticker.
// This batches multiple appends per fsync. If the channel is empty after
// sending the reply, we'll schedule an immediate flush.
if let ProposerAcceptorMessage::AppendRequest(append_request) = msg {
msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
dirty = true;
}
self.tli.process_msg(&msg).await?
}
// flush all written WAL to the disk
self.tli
.process_msg(&ProposerAcceptorMessage::FlushWAL)
.await?
} else {
// process message other than AppendRequest
self.tli.process_msg(&next_msg).await?
// While receiving AppendRequests, flush the WAL periodically and respond with an
// AppendResponse to let walproposer know we're still alive.
_ = flush_ticker.tick(), if dirty => {
dirty = false;
self.tli
.process_msg(&ProposerAcceptorMessage::FlushWAL)
.await?
}
// If there are no pending messages, flush the WAL immediately.
//
// TODO: this should be done via flush_ticker.reset_immediately(), but that's always
// delayed by 1ms due to this bug: https://github.com/tokio-rs/tokio/issues/6866.
_ = future::ready(()), if dirty && self.msg_rx.is_empty() => {
dirty = false;
flush_ticker.reset();
self.tli
.process_msg(&ProposerAcceptorMessage::FlushWAL)
.await?
}
};
if let Some(reply) = reply_msg {
// Send reply, if any.
if let Some(reply) = reply {
if self.reply_tx.send(reply).await.is_err() {
return Ok(()); // chan closed, streaming terminated
break; // disconnected, break to flush WAL and return
}
// reset keepalive time
next_keepalive = Instant::now() + KEEPALIVE_INTERVAL;
}
}
// Flush WAL on disconnect, see https://github.com/neondatabase/neon/issues/9259.
if dirty {
self.tli
.process_msg(&ProposerAcceptorMessage::FlushWAL)
.await?;
}
Ok(())
}
}

View File

@@ -40,6 +40,7 @@ from _pytest.fixtures import FixtureRequest
from psycopg2.extensions import connection as PgConnection
from psycopg2.extensions import cursor as PgCursor
from psycopg2.extensions import make_dsn, parse_dsn
from pytest_httpserver import HTTPServer
from urllib3.util.retry import Retry
from fixtures import overlayfs
@@ -3098,10 +3099,6 @@ class NeonProxy(PgProtocol):
class AuthBackend(abc.ABC):
"""All auth backends must inherit from this class"""
@property
def default_conn_url(self) -> Optional[str]:
return None
@abc.abstractmethod
def extra_args(self) -> list[str]:
pass
@@ -3115,7 +3112,7 @@ class NeonProxy(PgProtocol):
*["--allow-self-signed-compute", "true"],
]
class Console(AuthBackend):
class ControlPlane(AuthBackend):
def __init__(self, endpoint: str, fixed_rate_limit: Optional[int] = None):
self.endpoint = endpoint
self.fixed_rate_limit = fixed_rate_limit
@@ -3139,21 +3136,6 @@ class NeonProxy(PgProtocol):
]
return args
@dataclass(frozen=True)
class Postgres(AuthBackend):
pg_conn_url: str
@property
def default_conn_url(self) -> Optional[str]:
return self.pg_conn_url
def extra_args(self) -> list[str]:
return [
# Postgres auth backend params
*["--auth-backend", "postgres"],
*["--auth-endpoint", self.pg_conn_url],
]
def __init__(
self,
neon_binpath: Path,
@@ -3168,7 +3150,7 @@ class NeonProxy(PgProtocol):
):
host = "127.0.0.1"
domain = "proxy.localtest.me" # resolves to 127.0.0.1
super().__init__(dsn=auth_backend.default_conn_url, host=domain, port=proxy_port)
super().__init__(host=domain, port=proxy_port)
self.domain = domain
self.host = host
@@ -3422,20 +3404,39 @@ def static_proxy(
port_distributor: PortDistributor,
neon_binpath: Path,
test_output_dir: Path,
httpserver: HTTPServer,
) -> Iterator[NeonProxy]:
"""Neon proxy that routes directly to vanilla postgres."""
"""Neon proxy that routes directly to vanilla postgres and a mocked cplane HTTP API."""
port = vanilla_pg.default_options["port"]
host = vanilla_pg.default_options["host"]
dbname = vanilla_pg.default_options["dbname"]
auth_endpoint = f"postgres://proxy:password@{host}:{port}/{dbname}"
# For simplicity, we use the same user for both `--auth-endpoint` and `safe_psql`
vanilla_pg.start()
vanilla_pg.safe_psql("create user proxy with login superuser password 'password'")
vanilla_pg.safe_psql("CREATE SCHEMA IF NOT EXISTS neon_control_plane")
vanilla_pg.safe_psql(
"CREATE TABLE neon_control_plane.endpoints (endpoint_id VARCHAR(255) PRIMARY KEY, allowed_ips VARCHAR(255))"
[(rolpassword,)] = vanilla_pg.safe_psql(
"select rolpassword from pg_catalog.pg_authid where rolname = 'proxy'"
)
# return local postgres addr on ProxyWakeCompute.
httpserver.expect_request("/cplane/proxy_wake_compute").respond_with_json(
{
"address": f"{host}:{port}",
"aux": {
"endpoint_id": "ep-foo-bar-1234",
"branch_id": "br-foo-bar",
"project_id": "foo-bar",
},
}
)
# return local postgres addr on ProxyWakeCompute.
httpserver.expect_request("/cplane/proxy_get_role_secret").respond_with_json(
{
"role_secret": rolpassword,
"allowed_ips": None,
"project_id": "foo-bar",
}
)
proxy_port = port_distributor.get_port()
@@ -3450,8 +3451,12 @@ def static_proxy(
http_port=http_port,
mgmt_port=mgmt_port,
external_http_port=external_http_port,
auth_backend=NeonProxy.Postgres(auth_endpoint),
auth_backend=NeonProxy.ControlPlane(httpserver.url_for("/cplane")),
) as proxy:
proxy.default_options["user"] = "proxy"
proxy.default_options["password"] = "password"
proxy.default_options["dbname"] = dbname
proxy.start()
yield proxy

View File

@@ -6,20 +6,27 @@ from fixtures.neon_fixtures import (
NeonProxy,
VanillaPostgres,
)
from pytest_httpserver import HTTPServer
TABLE_NAME = "neon_control_plane.endpoints"
# Proxy uses the same logic for psql and websockets.
@pytest.mark.asyncio
async def test_proxy_psql_allowed_ips(static_proxy: NeonProxy, vanilla_pg: VanillaPostgres):
# Shouldn't be able to connect to this project
vanilla_pg.safe_psql(
f"INSERT INTO {TABLE_NAME} (endpoint_id, allowed_ips) VALUES ('private-project', '8.8.8.8')"
def test_proxy_psql_not_allowed_ips(
static_proxy: NeonProxy,
vanilla_pg: VanillaPostgres,
httpserver: HTTPServer,
):
[(rolpassword,)] = vanilla_pg.safe_psql(
"select rolpassword from pg_catalog.pg_authid where rolname = 'proxy'"
)
# Should be able to connect to this project
vanilla_pg.safe_psql(
f"INSERT INTO {TABLE_NAME} (endpoint_id, allowed_ips) VALUES ('generic-project', '::1,127.0.0.1')"
# Shouldn't be able to connect to this project
httpserver.expect_request("/cplane/proxy_get_role_secret").respond_with_json(
{
"role_secret": rolpassword,
"allowed_ips": ["8.8.8.8"],
"project_id": "foo-bar",
}
)
def check_cannot_connect(**kwargs):
@@ -37,6 +44,25 @@ async def test_proxy_psql_allowed_ips(static_proxy: NeonProxy, vanilla_pg: Vanil
# with SNI
check_cannot_connect(query="select 1", host="private-project.localtest.me")
def test_proxy_psql_allowed_ips(
static_proxy: NeonProxy,
vanilla_pg: VanillaPostgres,
httpserver: HTTPServer,
):
[(rolpassword,)] = vanilla_pg.safe_psql(
"select rolpassword from pg_catalog.pg_authid where rolname = 'proxy'"
)
# Should be able to connect to this project
httpserver.expect_request("/cplane/proxy_get_role_secret").respond_with_json(
{
"role_secret": rolpassword,
"allowed_ips": ["::1", "127.0.0.1"],
"project_id": "foo-bar",
}
)
# no SNI, deprecated `options=project` syntax (before we had several endpoint in project)
out = static_proxy.safe_psql(query="select 1", sslsni=0, options="project=generic-project")
assert out[0][0] == 1
@@ -50,27 +76,61 @@ async def test_proxy_psql_allowed_ips(static_proxy: NeonProxy, vanilla_pg: Vanil
assert out[0][0] == 1
@pytest.mark.asyncio
async def test_proxy_http_allowed_ips(static_proxy: NeonProxy, vanilla_pg: VanillaPostgres):
static_proxy.safe_psql("create user http_auth with password 'http' superuser")
def test_proxy_http_not_allowed_ips(
static_proxy: NeonProxy,
vanilla_pg: VanillaPostgres,
httpserver: HTTPServer,
):
vanilla_pg.safe_psql("create user http_auth with password 'http' superuser")
# Shouldn't be able to connect to this project
vanilla_pg.safe_psql(
f"INSERT INTO {TABLE_NAME} (endpoint_id, allowed_ips) VALUES ('proxy', '8.8.8.8')"
[(rolpassword,)] = vanilla_pg.safe_psql(
"select rolpassword from pg_catalog.pg_authid where rolname = 'http_auth'"
)
def query(status: int, query: str, *args):
httpserver.expect_oneshot_request("/cplane/proxy_get_role_secret").respond_with_json(
{
"role_secret": rolpassword,
"allowed_ips": ["8.8.8.8"],
"project_id": "foo-bar",
}
)
with httpserver.wait() as waiting:
static_proxy.http_query(
query,
args,
"select 1;",
[],
user="http_auth",
password="http",
expected_code=status,
expected_code=400,
)
assert waiting.result
query(400, "select 1;") # ip address is not allowed
# Should be able to connect to this project
vanilla_pg.safe_psql(
f"UPDATE {TABLE_NAME} SET allowed_ips = '8.8.8.8,127.0.0.1' WHERE endpoint_id = 'proxy'"
def test_proxy_http_allowed_ips(
static_proxy: NeonProxy,
vanilla_pg: VanillaPostgres,
httpserver: HTTPServer,
):
vanilla_pg.safe_psql("create user http_auth with password 'http' superuser")
[(rolpassword,)] = vanilla_pg.safe_psql(
"select rolpassword from pg_catalog.pg_authid where rolname = 'http_auth'"
)
query(200, "select 1;") # should work now
httpserver.expect_oneshot_request("/cplane/proxy_get_role_secret").respond_with_json(
{
"role_secret": rolpassword,
"allowed_ips": ["8.8.8.8", "127.0.0.1"],
"project_id": "foo-bar",
}
)
with httpserver.wait() as waiting:
static_proxy.http_query(
"select 1;",
[],
user="http_auth",
password="http",
expected_code=200,
)
assert waiting.result