Compare commits

...

10 Commits

Author SHA1 Message Date
Conrad Ludgate
edea436191 only one client, so only one channel pair 2025-05-21 22:54:56 +01:00
Conrad Ludgate
4b0c7f9530 remove arc around inner client 2025-05-21 21:58:03 +01:00
Conrad Ludgate
ab61864df1 proxy(tokio-postgres): move statement cleanup to client drop 2025-05-21 21:53:57 +01:00
Peter Bendel
f9fdbc9618 remove auth_endpoint password from log and command line for local proxy mode (#11991)
## Problem

When testing local proxy the auth-endpoint password shows up in command
line and log

```bash
RUST_LOG=proxy LOGFMT=text cargo run --release --package proxy --bin proxy --features testing -- \
  --auth-backend postgres \
  --auth-endpoint 'postgresql://postgres:secret_password@127.0.0.1:5432/postgres' \
  --tls-cert server.crt \
  --tls-key server.key \
  --wss 0.0.0.0:4444
```

## Summary of changes

- Allow to set env variable PGPASSWORD
- fall back to use PGPASSWORD env variable when auth-endpoint does not
contain password
- remove auth-endpoint password from logs in `--features testing` mode

Example

```bash
export PGPASSWORD=secret_password

RUST_LOG=proxy LOGFMT=text cargo run --package proxy --bin proxy --features testing -- \
  --auth-backend postgres \
  --auth-endpoint 'postgresql://postgres@127.0.0.1:5432/postgres' \
  --tls-cert server.crt \
  --tls-key server.key \
  --wss 0.0.0.0:4444 
```
2025-05-21 20:26:05 +00:00
Erik Grinaker
95a5f749c8 pageserver: use an Option for GcCutoffs::time (#11984)
## Problem

It is not currently possible to disambiguate a timeline with an
uninitialized PITR cutoff from one that was created within the PITR
window -- both of these have `GcCutoffs::time == Lsn(0)`. For billing
metrics, we need to disambiguate these to avoid accidentally billing the
entire history when a tenant is initially loaded.

Touches https://github.com/neondatabase/cloud/issues/28155.

## Summary of changes

Make `GcCutoffs::time` an `Option<Lsn>`, and only set it to `Some` when
initialized. A `pitr_interval` of 0 will yield `Some(last_record_lsn)`.

This PR takes a conservative approach, and mostly retains the old
behavior of consumers by using `unwrap_or_default()` to yield 0 when
uninitialized, to avoid accidentally introducing bugs -- except in cases
where there is high confidence that the change is beneficial (e.g. for
the `pageserver_pitr_history_size` Prometheus metric and to return early
during GC).
2025-05-21 15:42:11 +00:00
Konstantin Merenkov
5db20af8a7 Keep the conn info cache on max_client_conn from pgbouncer (#11986)
## Problem
Hitting max_client_conn from pgbouncer would lead to invalidation of the
conn info cache.
Customers would hit the limit on wake_compute.

## Summary of changes
`should_retry_wake_compute` detects this specific error from pgbouncer
as non-retriable,
meaning we won't try to wake up the compute again.
2025-05-21 15:27:30 +00:00
Arpad Müller
136cf1979b Add metric for number of offloaded timelines (#11976)
We want to keep track of the number of offloaded timelines. It's a
per-tenant shard metric because each shard makes offloading decisions on
its own.
2025-05-21 11:28:22 +00:00
Vlad Lazar
08bb72e516 pageserver: allow in-mem reads to be planned during writes (#11937)
## Problem

Get page tracing revealed situations where planning an in-memory layer
is taking around 150ms. Upon investigation, the culprit is the inner
in-mem layer file lock. A batch being written holds the write lock and a
read being planned wants the read lock. See [this
trace](https://neonprod.grafana.net/explore?schemaVersion=1&panes=%7B%22j61%22:%7B%22datasource%22:%22JMfY_5TVz%22,%22queries%22:%5B%7B%22refId%22:%22traceId%22,%22queryType%22:%22traceql%22,%22query%22:%22412ec4522fe1750798aca54aec2680ac%22,%22datasource%22:%7B%22type%22:%22tempo%22,%22uid%22:%22JMfY_5TVz%22%7D,%22limit%22:20,%22tableType%22:%22traces%22,%22metricsQueryType%22:%22range%22%7D%5D,%22range%22:%7B%22to%22:%221746702606349%22,%22from%22:%221746681006349%22%7D,%22panelsState%22:%7B%22trace%22:%7B%22spanId%22:%2291e9f1879c9bccc0%22%7D%7D%7D,%226d0%22:%7B%22datasource%22:%22JMfY_5TVz%22,%22queries%22:%5B%7B%22refId%22:%22traceId%22,%22queryType%22:%22traceql%22,%22query%22:%2220a4757706b16af0e1fbab83f9d2e925%22,%22datasource%22:%7B%22type%22:%22tempo%22,%22uid%22:%22JMfY_5TVz%22%7D,%22limit%22:20,%22tableType%22:%22traces%22,%22metricsQueryType%22:%22range%22%7D%5D,%22range%22:%7B%22to%22:%221746702614807%22,%22from%22:%221746681014807%22%7D,%22panelsState%22:%7B%22trace%22:%7B%22spanId%22:%2260e7825512bc2a6b%22%7D%7D%7D%7D)
for example.

## Summary of changes

Lift the index into its own RwLock such that we can at least plan during
write IO.

I tried to be smarter in
https://github.com/neondatabase/neon/pull/11866: arc swap + structurally
shared datastructure
and that killed ingest perf for small keys.

## Benchmarking

* No statistically significant difference for rust inget benchmarks when
compared to main.
2025-05-21 11:08:49 +00:00
Alexander Sarantcev
6f4f3691a5 pageserver: Add tracing endpoint correctness check in config validation (#11970)
## Problem

When using an incorrect endpoint string - `"localhost:4317"`, it's a
runtime error, but it can be a config error
- Closes: https://github.com/neondatabase/neon/issues/11394

## Summary of changes

Add config parse time check via `request::Url::parse` validation.

---------

Co-authored-by: Aleksandr Sarantsev <ephemeralsad@gmail.com>
2025-05-21 09:03:26 +00:00
dependabot[bot]
a2b756843e chore(deps): bump setuptools from 70.0.0 to 78.1.1 in the pip group across 1 directory (#11977)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-05-20 23:00:49 +00:00
31 changed files with 695 additions and 383 deletions

40
Cargo.lock generated
View File

@@ -3898,6 +3898,16 @@ dependencies = [
"winapi",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num"
version = "0.4.1"
@@ -4182,6 +4192,12 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "p256"
version = "0.11.1"
@@ -5239,6 +5255,7 @@ dependencies = [
"tracing-log",
"tracing-opentelemetry",
"tracing-subscriber",
"tracing-test",
"tracing-utils",
"try-lock",
"typed-json",
@@ -7689,6 +7706,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"serde",
@@ -7702,6 +7720,27 @@ dependencies = [
"tracing-serde",
]
[[package]]
name = "tracing-test"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68"
dependencies = [
"tracing-core",
"tracing-subscriber",
"tracing-test-macro",
]
[[package]]
name = "tracing-test-macro"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568"
dependencies = [
"quote",
"syn 2.0.100",
]
[[package]]
name = "tracing-utils"
version = "0.1.0"
@@ -8554,6 +8593,7 @@ dependencies = [
"tracing",
"tracing-core",
"tracing-log",
"tracing-subscriber",
"url",
"uuid",
"zeroize",

View File

@@ -1,14 +1,12 @@
use std::collections::HashMap;
use std::fmt;
use std::net::IpAddr;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
use futures_util::{TryStreamExt, future, ready};
use parking_lot::Mutex;
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use serde::{Deserialize, Serialize};
@@ -16,7 +14,6 @@ use tokio::sync::mpsc;
use crate::codec::{BackendMessages, FrontendMessage};
use crate::config::{Host, SslMode};
use crate::connection::{Request, RequestMessages};
use crate::query::RowStream;
use crate::simple_query::SimpleQueryStream;
use crate::types::{Oid, Type};
@@ -26,19 +23,43 @@ use crate::{
};
pub struct Responses {
/// new messages from conn
receiver: mpsc::Receiver<BackendMessages>,
/// current batch of messages
cur: BackendMessages,
/// number of total queries sent.
waiting: usize,
/// number of ReadyForQuery messages received.
received: usize,
}
impl Responses {
pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Message, Error>> {
loop {
match self.cur.next().map_err(Error::parse)? {
Some(Message::ErrorResponse(body)) => return Poll::Ready(Err(Error::db(body))),
Some(message) => return Poll::Ready(Ok(message)),
None => {}
// get the next saved message
if let Some(message) = self.cur.next().map_err(Error::parse)? {
let received = self.received;
// increase the query head if this is the last message.
if let Message::ReadyForQuery(_) = message {
self.received += 1;
}
// check if the client has skipped this query.
if received + 1 < self.waiting {
// grab the next message.
continue;
}
// convenience: turn the error messaage into a proper error.
let res = match message {
Message::ErrorResponse(body) => Err(Error::db(body)),
message => Ok(message),
};
return Poll::Ready(res);
}
// get the next back of messages.
match ready!(self.receiver.poll_recv(cx)) {
Some(messages) => self.cur = messages,
None => return Poll::Ready(Err(Error::closed())),
@@ -65,33 +86,28 @@ pub(crate) struct CachedTypeInfo {
}
pub struct InnerClient {
sender: mpsc::UnboundedSender<Request>,
sender: mpsc::UnboundedSender<FrontendMessage>,
responses: Responses,
/// A buffer to use when writing out postgres commands.
buffer: Mutex<BytesMut>,
buffer: BytesMut,
}
impl InnerClient {
pub fn send(&self, messages: RequestMessages) -> Result<Responses, Error> {
let (sender, receiver) = mpsc::channel(1);
let request = Request { messages, sender };
self.sender.send(request).map_err(|_| Error::closed())?;
Ok(Responses {
receiver,
cur: BackendMessages::empty(),
})
pub fn send(&mut self, messages: FrontendMessage) -> Result<&mut Responses, Error> {
self.sender.send(messages).map_err(|_| Error::closed())?;
self.responses.waiting += 1;
Ok(&mut self.responses)
}
/// Call the given function with a buffer to be used when writing out
/// postgres commands.
pub fn with_buf<F, R>(&self, f: F) -> R
pub fn with_buf<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&mut BytesMut) -> R,
{
let mut buffer = self.buffer.lock();
let r = f(&mut buffer);
buffer.clear();
let r = f(&mut self.buffer);
self.buffer.clear();
r
}
}
@@ -109,7 +125,7 @@ pub struct SocketConfig {
/// The client is one half of what is returned when a connection is established. Users interact with the database
/// through this client object.
pub struct Client {
inner: Arc<InnerClient>,
inner: InnerClient,
cached_typeinfo: CachedTypeInfo,
socket_config: SocketConfig,
@@ -118,19 +134,39 @@ pub struct Client {
secret_key: i32,
}
impl Drop for Client {
fn drop(&mut self) {
if let Some(stmt) = self.cached_typeinfo.typeinfo.take() {
let buf = self.inner.with_buf(|buf| {
frontend::close(b'S', stmt.name(), buf).unwrap();
frontend::sync(buf);
buf.split().freeze()
});
let _ = self.inner.send(FrontendMessage::Raw(buf));
}
}
}
impl Client {
pub(crate) fn new(
sender: mpsc::UnboundedSender<Request>,
sender: mpsc::UnboundedSender<FrontendMessage>,
receiver: mpsc::Receiver<BackendMessages>,
socket_config: SocketConfig,
ssl_mode: SslMode,
process_id: i32,
secret_key: i32,
) -> Client {
Client {
inner: Arc::new(InnerClient {
inner: InnerClient {
sender,
responses: Responses {
receiver,
cur: BackendMessages::empty(),
waiting: 0,
received: 0,
},
buffer: Default::default(),
}),
},
cached_typeinfo: Default::default(),
socket_config,
@@ -145,19 +181,23 @@ impl Client {
self.process_id
}
pub(crate) fn inner(&self) -> &Arc<InnerClient> {
&self.inner
pub(crate) fn inner(&mut self) -> &mut InnerClient {
&mut self.inner
}
/// Pass text directly to the Postgres backend to allow it to sort out typing itself and
/// to save a roundtrip
pub async fn query_raw_txt<S, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
pub async fn query_raw_txt<S, I>(
&mut self,
statement: &str,
params: I,
) -> Result<RowStream, Error>
where
S: AsRef<str>,
I: IntoIterator<Item = Option<S>>,
I::IntoIter: ExactSizeIterator,
{
query::query_txt(&self.inner, statement, params).await
query::query_txt(&mut self.inner, statement, params).await
}
/// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows.
@@ -173,11 +213,14 @@ impl Client {
/// Prepared statements should be use for any query which contains user-specified data, as they provided the
/// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
/// them to this method!
pub async fn simple_query(&self, query: &str) -> Result<Vec<SimpleQueryMessage>, Error> {
pub async fn simple_query(&mut self, query: &str) -> Result<Vec<SimpleQueryMessage>, Error> {
self.simple_query_raw(query).await?.try_collect().await
}
pub(crate) async fn simple_query_raw(&self, query: &str) -> Result<SimpleQueryStream, Error> {
pub(crate) async fn simple_query_raw(
&mut self,
query: &str,
) -> Result<SimpleQueryStream, Error> {
simple_query::simple_query(self.inner(), query).await
}
@@ -191,7 +234,7 @@ impl Client {
/// Prepared statements should be use for any query which contains user-specified data, as they provided the
/// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
/// them to this method!
pub async fn batch_execute(&self, query: &str) -> Result<ReadyForQueryStatus, Error> {
pub async fn batch_execute(&mut self, query: &str) -> Result<ReadyForQueryStatus, Error> {
simple_query::batch_execute(self.inner(), query).await
}
@@ -208,7 +251,7 @@ impl Client {
/// The transaction will roll back by default - use the `commit` method to commit it.
pub async fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
struct RollbackIfNotDone<'me> {
client: &'me Client,
client: &'me mut Client,
done: bool,
}
@@ -222,10 +265,7 @@ impl Client {
frontend::query("ROLLBACK", buf).unwrap();
buf.split().freeze()
});
let _ = self
.client
.inner()
.send(RequestMessages::Single(FrontendMessage::Raw(buf)));
let _ = self.client.inner().send(FrontendMessage::Raw(buf));
}
}
@@ -239,7 +279,7 @@ impl Client {
client: self,
done: false,
};
self.batch_execute("BEGIN").await?;
cleaner.client.batch_execute("BEGIN").await?;
cleaner.done = true;
}
@@ -267,7 +307,7 @@ impl Client {
/// Query for type information
pub(crate) async fn get_type_inner(&mut self, oid: Oid) -> Result<Type, Error> {
crate::prepare::get_type(&self.inner, &mut self.cached_typeinfo, oid).await
crate::prepare::get_type(&mut self.inner, &mut self.cached_typeinfo, oid).await
}
/// Determines if the connection to the server has already closed.

View File

@@ -1,21 +1,16 @@
use std::io;
use bytes::{Buf, Bytes, BytesMut};
use bytes::{Bytes, BytesMut};
use fallible_iterator::FallibleIterator;
use postgres_protocol2::message::backend;
use postgres_protocol2::message::frontend::CopyData;
use tokio_util::codec::{Decoder, Encoder};
pub enum FrontendMessage {
Raw(Bytes),
CopyData(CopyData<Box<dyn Buf + Send>>),
}
pub enum BackendMessage {
Normal {
messages: BackendMessages,
request_complete: bool,
},
Normal { messages: BackendMessages },
Async(backend::Message),
}
@@ -44,7 +39,6 @@ impl Encoder<FrontendMessage> for PostgresCodec {
fn encode(&mut self, item: FrontendMessage, dst: &mut BytesMut) -> io::Result<()> {
match item {
FrontendMessage::Raw(buf) => dst.extend_from_slice(&buf),
FrontendMessage::CopyData(data) => data.write(dst),
}
Ok(())
@@ -57,7 +51,6 @@ impl Decoder for PostgresCodec {
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<BackendMessage>, io::Error> {
let mut idx = 0;
let mut request_complete = false;
while let Some(header) = backend::Header::parse(&src[idx..])? {
let len = header.len() as usize + 1;
@@ -82,7 +75,6 @@ impl Decoder for PostgresCodec {
idx += len;
if header.tag() == backend::READY_FOR_QUERY_TAG {
request_complete = true;
break;
}
}
@@ -92,7 +84,6 @@ impl Decoder for PostgresCodec {
} else {
Ok(Some(BackendMessage::Normal {
messages: BackendMessages(src.split_to(idx)),
request_complete,
}))
}
}

View File

@@ -59,9 +59,11 @@ where
connect_timeout: config.connect_timeout,
};
let (sender, receiver) = mpsc::unbounded_channel();
let (client_tx, conn_rx) = mpsc::unbounded_channel();
let (conn_tx, client_rx) = mpsc::channel(4);
let client = Client::new(
sender,
client_tx,
client_rx,
socket_config,
config.ssl_mode,
process_id,
@@ -74,7 +76,7 @@ where
.map(|m| BackendMessage::Async(Message::NoticeResponse(m)))
.collect();
let connection = Connection::new(stream, delayed, parameters, receiver);
let connection = Connection::new(stream, delayed, parameters, conn_tx, conn_rx);
Ok((client, connection))
}

View File

@@ -4,7 +4,6 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
use futures_util::{Sink, Stream, ready};
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
@@ -19,30 +18,12 @@ use crate::error::DbError;
use crate::maybe_tls_stream::MaybeTlsStream;
use crate::{AsyncMessage, Error, Notification};
pub enum RequestMessages {
Single(FrontendMessage),
}
pub struct Request {
pub messages: RequestMessages,
pub sender: mpsc::Sender<BackendMessages>,
}
pub struct Response {
sender: PollSender<BackendMessages>,
}
#[derive(PartialEq, Debug)]
enum State {
Active,
Closing,
}
enum WriteReady {
Terminating,
WaitingOnRead,
}
/// A connection to a PostgreSQL database.
///
/// This is one half of what is returned when a new connection is established. It performs the actual IO with the
@@ -56,9 +37,11 @@ pub struct Connection<S, T> {
pub stream: Framed<MaybeTlsStream<S, T>, PostgresCodec>,
/// HACK: we need this in the Neon Proxy to forward params.
pub parameters: HashMap<String, String>,
receiver: mpsc::UnboundedReceiver<Request>,
sender: PollSender<BackendMessages>,
receiver: mpsc::UnboundedReceiver<FrontendMessage>,
pending_responses: VecDeque<BackendMessage>,
responses: VecDeque<Response>,
state: State,
}
@@ -71,14 +54,15 @@ where
stream: Framed<MaybeTlsStream<S, T>, PostgresCodec>,
pending_responses: VecDeque<BackendMessage>,
parameters: HashMap<String, String>,
receiver: mpsc::UnboundedReceiver<Request>,
sender: mpsc::Sender<BackendMessages>,
receiver: mpsc::UnboundedReceiver<FrontendMessage>,
) -> Connection<S, T> {
Connection {
stream,
parameters,
sender: PollSender::new(sender),
receiver,
pending_responses,
responses: VecDeque::new(),
state: State::Active,
}
}
@@ -110,7 +94,7 @@ where
}
};
let (mut messages, request_complete) = match message {
let messages = match message {
BackendMessage::Async(Message::NoticeResponse(body)) => {
let error = DbError::parse(&mut body.fields()).map_err(Error::parse)?;
return Poll::Ready(Ok(AsyncMessage::Notice(error)));
@@ -131,41 +115,19 @@ where
continue;
}
BackendMessage::Async(_) => unreachable!(),
BackendMessage::Normal {
messages,
request_complete,
} => (messages, request_complete),
BackendMessage::Normal { messages } => messages,
};
let mut response = match self.responses.pop_front() {
Some(response) => response,
None => match messages.next().map_err(Error::parse)? {
Some(Message::ErrorResponse(error)) => {
return Poll::Ready(Err(Error::db(error)));
}
_ => return Poll::Ready(Err(Error::unexpected_message())),
},
};
match response.sender.poll_reserve(cx) {
match self.sender.poll_reserve(cx) {
Poll::Ready(Ok(())) => {
let _ = response.sender.send_item(messages);
if !request_complete {
self.responses.push_front(response);
}
let _ = self.sender.send_item(messages);
}
Poll::Ready(Err(_)) => {
// we need to keep paging through the rest of the messages even if the receiver's hung up
if !request_complete {
self.responses.push_front(response);
}
return Poll::Ready(Err(Error::closed()));
}
Poll::Pending => {
self.responses.push_front(response);
self.pending_responses.push_back(BackendMessage::Normal {
messages,
request_complete,
});
self.pending_responses
.push_back(BackendMessage::Normal { messages });
trace!("poll_read: waiting on sender");
return Poll::Pending;
}
@@ -174,7 +136,7 @@ where
}
/// Fetch the next client request and enqueue the response sender.
fn poll_request(&mut self, cx: &mut Context<'_>) -> Poll<Option<RequestMessages>> {
fn poll_request(&mut self, cx: &mut Context<'_>) -> Poll<Option<FrontendMessage>> {
if self.receiver.is_closed() {
return Poll::Ready(None);
}
@@ -182,10 +144,7 @@ where
match self.receiver.poll_recv(cx) {
Poll::Ready(Some(request)) => {
trace!("polled new request");
self.responses.push_back(Response {
sender: PollSender::new(request.sender),
});
Poll::Ready(Some(request.messages))
Poll::Ready(Some(request))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
@@ -194,7 +153,7 @@ where
/// Process client requests and write them to the postgres connection, flushing if necessary.
/// client -> postgres
fn poll_write(&mut self, cx: &mut Context<'_>) -> Poll<Result<WriteReady, Error>> {
fn poll_write(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
loop {
if Pin::new(&mut self.stream)
.poll_ready(cx)
@@ -209,14 +168,14 @@ where
match self.poll_request(cx) {
// send the message to postgres
Poll::Ready(Some(RequestMessages::Single(request))) => {
Poll::Ready(Some(request)) => {
Pin::new(&mut self.stream)
.start_send(request)
.map_err(Error::io)?;
}
// No more messages from the client, and no more responses to wait for.
// Send a terminate message to postgres
Poll::Ready(None) if self.responses.is_empty() => {
Poll::Ready(None) => {
trace!("poll_write: at eof, terminating");
let mut request = BytesMut::new();
frontend::terminate(&mut request);
@@ -228,16 +187,7 @@ where
trace!("poll_write: sent eof, closing");
trace!("poll_write: done");
return Poll::Ready(Ok(WriteReady::Terminating));
}
// No more messages from the client, but there are still some responses to wait for.
Poll::Ready(None) => {
trace!(
"poll_write: at eof, pending responses {}",
self.responses.len()
);
ready!(self.poll_flush(cx))?;
return Poll::Ready(Ok(WriteReady::WaitingOnRead));
return Poll::Ready(Ok(()));
}
// Still waiting for a message from the client.
Poll::Pending => {
@@ -298,7 +248,7 @@ where
// if the state is still active, try read from and write to postgres.
let message = self.poll_read(cx)?;
let closing = self.poll_write(cx)?;
if let Poll::Ready(WriteReady::Terminating) = closing {
if let Poll::Ready(()) = closing {
self.state = State::Closing;
}

View File

@@ -86,6 +86,27 @@ pub struct DbError {
}
impl DbError {
pub fn new_test_error(code: SqlState, message: String) -> Self {
DbError {
severity: "ERROR".to_string(),
parsed_severity: Some(Severity::Error),
code,
message,
detail: None,
hint: None,
position: None,
where_: None,
schema: None,
table: None,
column: None,
datatype: None,
constraint: None,
file: None,
line: None,
routine: None,
}
}
pub(crate) fn parse(fields: &mut ErrorFields<'_>) -> io::Result<DbError> {
let mut severity = None;
let mut parsed_severity = None;

View File

@@ -15,7 +15,7 @@ mod private {
/// This trait is "sealed", and cannot be implemented outside of this crate.
pub trait GenericClient: private::Sealed {
/// Like `Client::query_raw_txt`.
async fn query_raw_txt<S, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
async fn query_raw_txt<S, I>(&mut self, statement: &str, params: I) -> Result<RowStream, Error>
where
S: AsRef<str> + Sync + Send,
I: IntoIterator<Item = Option<S>> + Sync + Send,
@@ -28,7 +28,7 @@ pub trait GenericClient: private::Sealed {
impl private::Sealed for Client {}
impl GenericClient for Client {
async fn query_raw_txt<S, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
async fn query_raw_txt<S, I>(&mut self, statement: &str, params: I) -> Result<RowStream, Error>
where
S: AsRef<str> + Sync + Send,
I: IntoIterator<Item = Option<S>> + Sync + Send,
@@ -46,7 +46,7 @@ impl GenericClient for Client {
impl private::Sealed for Transaction<'_> {}
impl GenericClient for Transaction<'_> {
async fn query_raw_txt<S, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
async fn query_raw_txt<S, I>(&mut self, statement: &str, params: I) -> Result<RowStream, Error>
where
S: AsRef<str> + Sync + Send,
I: IntoIterator<Item = Option<S>> + Sync + Send,

View File

@@ -1,6 +1,5 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use bytes::Bytes;
use fallible_iterator::FallibleIterator;
@@ -11,7 +10,6 @@ use tracing::debug;
use crate::client::{CachedTypeInfo, InnerClient};
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::types::{Kind, Oid, Type};
use crate::{Column, Error, Statement, query, slice_iter};
@@ -24,13 +22,13 @@ WHERE t.oid = $1
";
async fn prepare_typecheck(
client: &Arc<InnerClient>,
client: &mut InnerClient,
name: &'static str,
query: &str,
types: &[Type],
) -> Result<Statement, Error> {
let buf = encode(client, name, query, types)?;
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
let responses = client.send(FrontendMessage::Raw(buf))?;
match responses.next().await? {
Message::ParseComplete => {}
@@ -65,10 +63,15 @@ async fn prepare_typecheck(
}
}
Ok(Statement::new(client, name, parameters, columns))
Ok(Statement::new(name, parameters, columns))
}
fn encode(client: &InnerClient, name: &str, query: &str, types: &[Type]) -> Result<Bytes, Error> {
fn encode(
client: &mut InnerClient,
name: &str,
query: &str,
types: &[Type],
) -> Result<Bytes, Error> {
if types.is_empty() {
debug!("preparing query {}: {}", name, query);
} else {
@@ -84,7 +87,7 @@ fn encode(client: &InnerClient, name: &str, query: &str, types: &[Type]) -> Resu
}
pub async fn get_type(
client: &Arc<InnerClient>,
client: &mut InnerClient,
typecache: &mut CachedTypeInfo,
oid: Oid,
) -> Result<Type, Error> {
@@ -139,7 +142,7 @@ pub async fn get_type(
}
fn get_type_rec<'a>(
client: &'a Arc<InnerClient>,
client: &'a mut InnerClient,
typecache: &'a mut CachedTypeInfo,
oid: Oid,
) -> Pin<Box<dyn Future<Output = Result<Type, Error>> + Send + 'a>> {
@@ -147,7 +150,7 @@ fn get_type_rec<'a>(
}
async fn typeinfo_statement(
client: &Arc<InnerClient>,
client: &mut InnerClient,
typecache: &mut CachedTypeInfo,
) -> Result<Statement, Error> {
if let Some(stmt) = &typecache.typeinfo {

View File

@@ -1,13 +1,10 @@
use std::fmt;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use bytes::{BufMut, Bytes, BytesMut};
use fallible_iterator::FallibleIterator;
use futures_util::{Stream, ready};
use pin_project_lite::pin_project;
use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use postgres_types2::{Format, ToSql, Type};
@@ -15,7 +12,6 @@ use tracing::debug;
use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::types::IsNull;
use crate::{Column, Error, ReadyForQueryStatus, Row, Statement};
@@ -28,7 +24,7 @@ impl fmt::Debug for BorrowToSqlParamsDebug<'_> {
}
pub async fn query<'a, I>(
client: &InnerClient,
client: &mut InnerClient,
statement: Statement,
params: I,
) -> Result<RowStream, Error>
@@ -49,20 +45,19 @@ where
};
let responses = start(client, buf).await?;
Ok(RowStream {
statement,
responses,
statement,
command_tag: None,
status: ReadyForQueryStatus::Unknown,
output_format: Format::Binary,
_p: PhantomPinned,
})
}
pub async fn query_txt<S, I>(
client: &Arc<InnerClient>,
pub async fn query_txt<'a, S, I>(
client: &'a mut InnerClient,
query: &str,
params: I,
) -> Result<RowStream, Error>
) -> Result<RowStream<'a>, Error>
where
S: AsRef<str>,
I: IntoIterator<Item = Option<S>>,
@@ -109,7 +104,7 @@ where
})?;
// now read the responses
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
let responses = client.send(FrontendMessage::Raw(buf))?;
match responses.next().await? {
Message::ParseComplete => {}
@@ -150,17 +145,16 @@ where
}
Ok(RowStream {
statement: Statement::new_anonymous(parameters, columns),
responses,
statement: Statement::new_anonymous(parameters, columns),
command_tag: None,
status: ReadyForQueryStatus::Unknown,
output_format: Format::Text,
_p: PhantomPinned,
})
}
async fn start(client: &InnerClient, buf: Bytes) -> Result<Responses, Error> {
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
async fn start(client: &mut InnerClient, buf: Bytes) -> Result<&mut Responses, Error> {
let responses = client.send(FrontendMessage::Raw(buf))?;
match responses.next().await? {
Message::BindComplete => {}
@@ -170,7 +164,11 @@ async fn start(client: &InnerClient, buf: Bytes) -> Result<Responses, Error> {
Ok(responses)
}
pub fn encode<'a, I>(client: &InnerClient, statement: &Statement, params: I) -> Result<Bytes, Error>
pub fn encode<'a, I>(
client: &mut InnerClient,
statement: &Statement,
params: I,
) -> Result<Bytes, Error>
where
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
I::IntoIter: ExactSizeIterator,
@@ -234,41 +232,37 @@ where
}
}
pin_project! {
/// A stream of table rows.
pub struct RowStream {
statement: Statement,
responses: Responses,
command_tag: Option<String>,
output_format: Format,
status: ReadyForQueryStatus,
#[pin]
_p: PhantomPinned,
}
/// A stream of table rows.
pub struct RowStream<'a> {
responses: &'a mut Responses,
output_format: Format,
pub statement: Statement,
pub command_tag: Option<String>,
pub status: ReadyForQueryStatus,
}
impl Stream for RowStream {
impl Stream for RowStream<'_> {
type Item = Result<Row, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let this = self.get_mut();
loop {
match ready!(this.responses.poll_next(cx)?) {
Message::DataRow(body) => {
return Poll::Ready(Some(Ok(Row::new(
this.statement.clone(),
body,
*this.output_format,
this.output_format,
)?)));
}
Message::EmptyQueryResponse | Message::PortalSuspended => {}
Message::CommandComplete(body) => {
if let Ok(tag) = body.tag() {
*this.command_tag = Some(tag.to_string());
this.command_tag = Some(tag.to_string());
}
}
Message::ReadyForQuery(status) => {
*this.status = status.into();
this.status = status.into();
return Poll::Ready(None);
}
_ => return Poll::Ready(Some(Err(Error::unexpected_message()))),
@@ -276,24 +270,3 @@ impl Stream for RowStream {
}
}
}
impl RowStream {
/// Returns information about the columns of data in the row.
pub fn columns(&self) -> &[Column] {
self.statement.columns()
}
/// Returns the command tag of this query.
///
/// This is only available after the stream has been exhausted.
pub fn command_tag(&self) -> Option<String> {
self.command_tag.clone()
}
/// Returns if the connection is ready for querying, with the status of the connection.
///
/// This might be available only after the stream has been exhausted.
pub fn ready_status(&self) -> ReadyForQueryStatus {
self.status
}
}

View File

@@ -1,4 +1,3 @@
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
@@ -13,7 +12,6 @@ use tracing::debug;
use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::{Error, ReadyForQueryStatus, SimpleQueryMessage, SimpleQueryRow};
/// Information about a column of a single query row.
@@ -33,28 +31,30 @@ impl SimpleColumn {
}
}
pub async fn simple_query(client: &InnerClient, query: &str) -> Result<SimpleQueryStream, Error> {
pub async fn simple_query<'a>(
client: &'a mut InnerClient,
query: &str,
) -> Result<SimpleQueryStream<'a>, Error> {
debug!("executing simple query: {}", query);
let buf = encode(client, query)?;
let responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
let responses = client.send(FrontendMessage::Raw(buf))?;
Ok(SimpleQueryStream {
responses,
columns: None,
status: ReadyForQueryStatus::Unknown,
_p: PhantomPinned,
})
}
pub async fn batch_execute(
client: &InnerClient,
client: &mut InnerClient,
query: &str,
) -> Result<ReadyForQueryStatus, Error> {
debug!("executing statement batch: {}", query);
let buf = encode(client, query)?;
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
let responses = client.send(FrontendMessage::Raw(buf))?;
loop {
match responses.next().await? {
@@ -68,7 +68,7 @@ pub async fn batch_execute(
}
}
pub(crate) fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error> {
pub(crate) fn encode(client: &mut InnerClient, query: &str) -> Result<Bytes, Error> {
client.with_buf(|buf| {
frontend::query(query, buf).map_err(Error::encode)?;
Ok(buf.split().freeze())
@@ -77,16 +77,14 @@ pub(crate) fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error>
pin_project! {
/// A stream of simple query results.
pub struct SimpleQueryStream {
responses: Responses,
pub struct SimpleQueryStream<'a> {
responses: &'a mut Responses,
columns: Option<Arc<[SimpleColumn]>>,
status: ReadyForQueryStatus,
#[pin]
_p: PhantomPinned,
}
}
impl SimpleQueryStream {
impl SimpleQueryStream<'_> {
/// Returns if the connection is ready for querying, with the status of the connection.
///
/// This might be available only after the stream has been exhausted.
@@ -95,7 +93,7 @@ impl SimpleQueryStream {
}
}
impl Stream for SimpleQueryStream {
impl Stream for SimpleQueryStream<'_> {
type Item = Result<SimpleQueryMessage, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {

View File

@@ -1,35 +1,16 @@
use std::fmt;
use std::sync::{Arc, Weak};
use std::sync::Arc;
use crate::types::Type;
use postgres_protocol2::Oid;
use postgres_protocol2::message::backend::Field;
use postgres_protocol2::message::frontend;
use crate::client::InnerClient;
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::types::Type;
struct StatementInner {
client: Weak<InnerClient>,
name: &'static str,
params: Vec<Type>,
columns: Vec<Column>,
}
impl Drop for StatementInner {
fn drop(&mut self) {
if let Some(client) = self.client.upgrade() {
let buf = client.with_buf(|buf| {
frontend::close(b'S', self.name, buf).unwrap();
frontend::sync(buf);
buf.split().freeze()
});
let _ = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)));
}
}
}
/// A prepared statement.
///
/// Prepared statements can only be used with the connection that created them.
@@ -37,14 +18,8 @@ impl Drop for StatementInner {
pub struct Statement(Arc<StatementInner>);
impl Statement {
pub(crate) fn new(
inner: &Arc<InnerClient>,
name: &'static str,
params: Vec<Type>,
columns: Vec<Column>,
) -> Statement {
pub(crate) fn new(name: &'static str, params: Vec<Type>, columns: Vec<Column>) -> Statement {
Statement(Arc::new(StatementInner {
client: Arc::downgrade(inner),
name,
params,
columns,
@@ -53,7 +28,6 @@ impl Statement {
pub(crate) fn new_anonymous(params: Vec<Type>, columns: Vec<Column>) -> Statement {
Statement(Arc::new(StatementInner {
client: Weak::new(),
name: "<anonymous>",
params,
columns,

View File

@@ -1,7 +1,6 @@
use postgres_protocol2::message::frontend;
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::query::RowStream;
use crate::{CancelToken, Client, Error, ReadyForQueryStatus};
@@ -24,10 +23,7 @@ impl Drop for Transaction<'_> {
frontend::query("ROLLBACK", buf).unwrap();
buf.split().freeze()
});
let _ = self
.client
.inner()
.send(RequestMessages::Single(FrontendMessage::Raw(buf)));
let _ = self.client.inner().send(FrontendMessage::Raw(buf));
}
}
@@ -54,7 +50,11 @@ impl<'a> Transaction<'a> {
}
/// Like `Client::query_raw_txt`.
pub async fn query_raw_txt<S, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
pub async fn query_raw_txt<S, I>(
&mut self,
statement: &str,
params: I,
) -> Result<RowStream, Error>
where
S: AsRef<str>,
I: IntoIterator<Item = Option<S>>,

View File

@@ -544,6 +544,23 @@ impl PageServerConf {
ratio.numerator, ratio.denominator
)
);
let url = Url::parse(&tracing_config.export_config.endpoint)
.map_err(anyhow::Error::msg)
.with_context(|| {
format!(
"tracing endpoint URL is invalid : {}",
tracing_config.export_config.endpoint
)
})?;
ensure!(
url.scheme() == "http" || url.scheme() == "https",
format!(
"tracing endpoint URL must start with http:// or https://: {}",
tracing_config.export_config.endpoint
)
);
}
IndexEntry::validate_checkpoint_distance(conf.default_tenant_conf.checkpoint_distance)
@@ -660,4 +677,25 @@ mod tests {
PageServerConf::parse_and_validate(NodeId(0), config_toml, &workdir)
.expect("parse_and_validate");
}
#[test]
fn test_config_tracing_endpoint_is_invalid() {
let input = r#"
control_plane_api = "http://localhost:6666"
[tracing]
sampling_ratio = { numerator = 1, denominator = 0 }
[tracing.export_config]
endpoint = "localhost:4317"
protocol = "http-binary"
timeout = "1ms"
"#;
let config_toml = toml_edit::de::from_str::<pageserver_api::config::ConfigToml>(input)
.expect("config has valid fields");
let workdir = Utf8PathBuf::from("/nonexistent");
PageServerConf::parse_and_validate(NodeId(0), config_toml, &workdir)
.expect_err("parse_and_validate should fail for endpoint without scheme");
}
}

View File

@@ -449,7 +449,7 @@ async fn build_timeline_info_common(
// Internally we distinguish between the planned GC cutoff (PITR point) and the "applied" GC cutoff (where we
// actually trimmed data to), which can pass each other when PITR is changed.
let min_readable_lsn = std::cmp::max(
timeline.get_gc_cutoff_lsn(),
timeline.get_gc_cutoff_lsn().unwrap_or_default(),
*timeline.get_applied_gc_cutoff_lsn(),
);

View File

@@ -1066,6 +1066,15 @@ pub(crate) static TENANT_SYNTHETIC_SIZE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|
.expect("Failed to register pageserver_tenant_synthetic_cached_size_bytes metric")
});
pub(crate) static TENANT_OFFLOADED_TIMELINES: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_tenant_offloaded_timelines",
"Number of offloaded timelines of a tenant",
&["tenant_id", "shard_id"]
)
.expect("Failed to register pageserver_tenant_offloaded_timelines metric")
});
pub(crate) static EVICTION_ITERATION_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_eviction_iteration_duration_seconds_global",
@@ -3551,11 +3560,14 @@ impl TimelineMetrics {
}
pub(crate) fn remove_tenant_metrics(tenant_shard_id: &TenantShardId) {
let tid = tenant_shard_id.tenant_id.to_string();
let shard_id = tenant_shard_id.shard_slug().to_string();
// Only shard zero deals in synthetic sizes
if tenant_shard_id.is_shard_zero() {
let tid = tenant_shard_id.tenant_id.to_string();
let _ = TENANT_SYNTHETIC_SIZE_METRIC.remove_label_values(&[&tid]);
}
let _ = TENANT_OFFLOADED_TIMELINES.remove_label_values(&[&tid, &shard_id]);
tenant_throttling::remove_tenant_metrics(tenant_shard_id);

View File

@@ -86,8 +86,8 @@ use crate::deletion_queue::{DeletionQueueClient, DeletionQueueError};
use crate::l0_flush::L0FlushGlobalState;
use crate::metrics::{
BROKEN_TENANTS_SET, CIRCUIT_BREAKERS_BROKEN, CIRCUIT_BREAKERS_UNBROKEN, CONCURRENT_INITDBS,
INITDB_RUN_TIME, INITDB_SEMAPHORE_ACQUISITION_TIME, TENANT, TENANT_STATE_METRIC,
TENANT_SYNTHETIC_SIZE_METRIC, remove_tenant_metrics,
INITDB_RUN_TIME, INITDB_SEMAPHORE_ACQUISITION_TIME, TENANT, TENANT_OFFLOADED_TIMELINES,
TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC, remove_tenant_metrics,
};
use crate::task_mgr::TaskKind;
use crate::tenant::config::LocationMode;
@@ -3348,6 +3348,13 @@ impl TenantShard {
activated_timelines += 1;
}
let tid = self.tenant_shard_id.tenant_id.to_string();
let shard_id = self.tenant_shard_id.shard_slug().to_string();
let offloaded_timeline_count = timelines_offloaded_accessor.len();
TENANT_OFFLOADED_TIMELINES
.with_label_values(&[&tid, &shard_id])
.set(offloaded_timeline_count as u64);
self.state.send_modify(move |current_state| {
assert!(
matches!(current_state, TenantState::Activating(_)),
@@ -4587,7 +4594,7 @@ impl TenantShard {
target.cutoffs = GcCutoffs {
space: space_cutoff,
time: Lsn::INVALID,
time: None,
};
}
}
@@ -4671,7 +4678,7 @@ impl TenantShard {
if let Some(ancestor_id) = timeline.get_ancestor_timeline_id() {
if let Some(ancestor_gc_cutoffs) = gc_cutoffs.get(&ancestor_id) {
target.within_ancestor_pitr =
timeline.get_ancestor_lsn() >= ancestor_gc_cutoffs.time;
Some(timeline.get_ancestor_lsn()) >= ancestor_gc_cutoffs.time;
}
}
@@ -4684,13 +4691,15 @@ impl TenantShard {
} else {
0
});
timeline.metrics.pitr_history_size.set(
timeline
.get_last_record_lsn()
.checked_sub(target.cutoffs.time)
.unwrap_or(Lsn(0))
.0,
);
if let Some(time_cutoff) = target.cutoffs.time {
timeline.metrics.pitr_history_size.set(
timeline
.get_last_record_lsn()
.checked_sub(time_cutoff)
.unwrap_or_default()
.0,
);
}
// Apply the cutoffs we found to the Timeline's GcInfo. Why might we _not_ have cutoffs for a timeline?
// - this timeline was created while we were finding cutoffs
@@ -4699,8 +4708,8 @@ impl TenantShard {
let original_cutoffs = target.cutoffs.clone();
// GC cutoffs should never go back
target.cutoffs = GcCutoffs {
space: Lsn(cutoffs.space.0.max(original_cutoffs.space.0)),
time: Lsn(cutoffs.time.0.max(original_cutoffs.time.0)),
space: cutoffs.space.max(original_cutoffs.space),
time: cutoffs.time.max(original_cutoffs.time),
}
}
}
@@ -5560,6 +5569,14 @@ impl TenantShard {
}
}
// Update metrics
let tid = self.tenant_shard_id.to_string();
let shard_id = self.tenant_shard_id.shard_slug().to_string();
let set_key = &[tid.as_str(), shard_id.as_str()][..];
TENANT_OFFLOADED_TIMELINES
.with_label_values(set_key)
.set(manifest.offloaded_timelines.len() as u64);
// Upload the manifest. Remote storage does no retries internally, so retry here.
match backoff::retry(
|| async {
@@ -8937,7 +8954,7 @@ mod tests {
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x30);
guard.cutoffs.time = Some(Lsn(0x30));
guard.cutoffs.space = Lsn(0x30);
}
@@ -9045,7 +9062,7 @@ mod tests {
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x40);
guard.cutoffs.time = Some(Lsn(0x40));
guard.cutoffs.space = Lsn(0x40);
}
tline
@@ -9463,7 +9480,7 @@ mod tests {
*guard = GcInfo {
retain_lsns: vec![],
cutoffs: GcCutoffs {
time: Lsn(0x30),
time: Some(Lsn(0x30)),
space: Lsn(0x30),
},
leases: Default::default(),
@@ -9547,7 +9564,7 @@ mod tests {
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x40);
guard.cutoffs.time = Some(Lsn(0x40));
guard.cutoffs.space = Lsn(0x40);
}
tline
@@ -10018,7 +10035,7 @@ mod tests {
(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No),
],
cutoffs: GcCutoffs {
time: Lsn(0x30),
time: Some(Lsn(0x30)),
space: Lsn(0x30),
},
leases: Default::default(),
@@ -10081,7 +10098,7 @@ mod tests {
let verify_result = || async {
let gc_horizon = {
let gc_info = tline.gc_info.read().unwrap();
gc_info.cutoffs.time
gc_info.cutoffs.time.unwrap_or_default()
};
for idx in 0..10 {
assert_eq!(
@@ -10159,7 +10176,7 @@ mod tests {
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x38);
guard.cutoffs.time = Some(Lsn(0x38));
guard.cutoffs.space = Lsn(0x38);
}
tline
@@ -10267,7 +10284,7 @@ mod tests {
(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No),
],
cutoffs: GcCutoffs {
time: Lsn(0x30),
time: Some(Lsn(0x30)),
space: Lsn(0x30),
},
leases: Default::default(),
@@ -10330,7 +10347,7 @@ mod tests {
let verify_result = || async {
let gc_horizon = {
let gc_info = tline.gc_info.read().unwrap();
gc_info.cutoffs.time
gc_info.cutoffs.time.unwrap_or_default()
};
for idx in 0..10 {
assert_eq!(
@@ -10516,7 +10533,7 @@ mod tests {
*guard = GcInfo {
retain_lsns: vec![(Lsn(0x18), branch_tline.timeline_id, MaybeOffloaded::No)],
cutoffs: GcCutoffs {
time: Lsn(0x10),
time: Some(Lsn(0x10)),
space: Lsn(0x10),
},
leases: Default::default(),
@@ -10536,7 +10553,7 @@ mod tests {
*guard = GcInfo {
retain_lsns: vec![(Lsn(0x40), branch_tline.timeline_id, MaybeOffloaded::No)],
cutoffs: GcCutoffs {
time: Lsn(0x50),
time: Some(Lsn(0x50)),
space: Lsn(0x50),
},
leases: Default::default(),
@@ -11257,7 +11274,7 @@ mod tests {
*guard = GcInfo {
retain_lsns: vec![(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No)],
cutoffs: GcCutoffs {
time: Lsn(0x30),
time: Some(Lsn(0x30)),
space: Lsn(0x30),
},
leases: Default::default(),
@@ -11646,7 +11663,7 @@ mod tests {
(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No),
],
cutoffs: GcCutoffs {
time: Lsn(0x30),
time: Some(Lsn(0x30)),
space: Lsn(0x30),
},
leases: Default::default(),
@@ -11709,7 +11726,7 @@ mod tests {
let verify_result = || async {
let gc_horizon = {
let gc_info = tline.gc_info.read().unwrap();
gc_info.cutoffs.time
gc_info.cutoffs.time.unwrap_or_default()
};
for idx in 0..10 {
assert_eq!(
@@ -11898,7 +11915,7 @@ mod tests {
(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No),
],
cutoffs: GcCutoffs {
time: Lsn(0x30),
time: Some(Lsn(0x30)),
space: Lsn(0x30),
},
leases: Default::default(),
@@ -11961,7 +11978,7 @@ mod tests {
let verify_result = || async {
let gc_horizon = {
let gc_info = tline.gc_info.read().unwrap();
gc_info.cutoffs.time
gc_info.cutoffs.time.unwrap_or_default()
};
for idx in 0..10 {
assert_eq!(
@@ -12224,7 +12241,7 @@ mod tests {
*guard = GcInfo {
retain_lsns: vec![],
cutoffs: GcCutoffs {
time: Lsn(0x30),
time: Some(Lsn(0x30)),
space: Lsn(0x30),
},
leases: Default::default(),

View File

@@ -235,7 +235,7 @@ pub(super) async fn gather_inputs(
// than our internal space cutoff. This means that if someone drops a database and waits for their
// PITR interval, they will see synthetic size decrease, even if we are still storing data inside
// the space cutoff.
let mut next_pitr_cutoff = gc_info.cutoffs.time;
let mut next_pitr_cutoff = gc_info.cutoffs.time.unwrap_or_default(); // TODO: handle None
// If the caller provided a shorter retention period, use that instead of the GC cutoff.
let retention_param_cutoff = if let Some(max_retention_period) = max_retention_period {

View File

@@ -63,7 +63,28 @@ pub struct InMemoryLayer {
opened_at: Instant,
/// The above fields never change, except for `end_lsn`, which is only set once.
/// All versions of all pages in the layer are kept here. Indexed
/// by block number and LSN. The [`IndexEntry`] is an offset into the
/// ephemeral file where the page version is stored.
///
/// We use a separate lock for the index to reduce the critical section
/// during which reads cannot be planned.
///
/// If you need access to both the index and the underlying file at the same time,
/// respect the following locking order to avoid deadlocks:
/// 1. [`InMemoryLayer::inner`]
/// 2. [`InMemoryLayer::index`]
///
/// Note that the file backing [`InMemoryLayer::inner`] is append-only,
/// so it is not necessary to hold simultaneous locks on index.
/// This avoids holding index locks across IO, and is crucial for avoiding read tail latency.
/// In particular:
/// 1. It is safe to read and release [`InMemoryLayer::index`] before locking and reading from [`InMemoryLayer::inner`].
/// 2. It is safe to write and release [`InMemoryLayer::inner`] before locking and updating [`InMemoryLayer::index`].
index: RwLock<BTreeMap<CompactKey, VecMap<Lsn, IndexEntry>>>,
/// The above fields never change, except for `end_lsn`, which is only set once,
/// and `index` (see rationale there).
/// All other changing parts are in `inner`, and protected by a mutex.
inner: RwLock<InMemoryLayerInner>,
@@ -81,11 +102,6 @@ impl std::fmt::Debug for InMemoryLayer {
}
pub struct InMemoryLayerInner {
/// All versions of all pages in the layer are kept here. Indexed
/// by block number and LSN. The [`IndexEntry`] is an offset into the
/// ephemeral file where the page version is stored.
index: BTreeMap<CompactKey, VecMap<Lsn, IndexEntry>>,
/// The values are stored in a serialized format in this file.
/// Each serialized Value is preceded by a 'u32' length field.
/// PerSeg::page_versions map stores offsets into this file.
@@ -105,7 +121,7 @@ const MAX_SUPPORTED_BLOB_LEN_BITS: usize = {
trailing_ones
};
/// See [`InMemoryLayerInner::index`].
/// See [`InMemoryLayer::index`].
///
/// For memory efficiency, the data is packed into a u64.
///
@@ -425,7 +441,7 @@ impl InMemoryLayer {
.page_content_kind(PageContentKind::InMemoryLayer)
.attached_child();
let inner = self.inner.read().await;
let index = self.index.read().await;
struct ValueRead {
entry_lsn: Lsn,
@@ -435,10 +451,7 @@ impl InMemoryLayer {
let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
for range in keyspace.ranges.iter() {
for (key, vec_map) in inner
.index
.range(range.start.to_compact()..range.end.to_compact())
{
for (key, vec_map) in index.range(range.start.to_compact()..range.end.to_compact()) {
let key = Key::from_compact(*key);
let slice = vec_map.slice_range(lsn_range.clone());
@@ -466,7 +479,7 @@ impl InMemoryLayer {
}
}
}
drop(inner); // release the lock before we spawn the IO; if it's serial-mode IO we will deadlock on the read().await below
drop(index); // release the lock before we spawn the IO; if it's serial-mode IO we will deadlock on the read().await below
let read_from = Arc::clone(self);
let read_ctx = ctx.attached_child();
reconstruct_state
@@ -573,8 +586,8 @@ impl InMemoryLayer {
start_lsn,
end_lsn: OnceLock::new(),
opened_at: Instant::now(),
index: RwLock::new(BTreeMap::new()),
inner: RwLock::new(InMemoryLayerInner {
index: BTreeMap::new(),
file,
resource_units: GlobalResourceUnits::new(),
}),
@@ -592,31 +605,39 @@ impl InMemoryLayer {
serialized_batch: SerializedValueBatch,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut inner = self.inner.write().await;
self.assert_writable();
let (base_offset, metadata) = {
let mut inner = self.inner.write().await;
self.assert_writable();
let base_offset = inner.file.len();
let base_offset = inner.file.len();
let SerializedValueBatch {
raw,
metadata,
max_lsn: _,
len: _,
} = serialized_batch;
let SerializedValueBatch {
raw,
metadata,
max_lsn: _,
len: _,
} = serialized_batch;
// Write the batch to the file
inner.file.write_raw(&raw, ctx).await?;
let new_size = inner.file.len();
// Write the batch to the file
inner.file.write_raw(&raw, ctx).await?;
let new_size = inner.file.len();
let expected_new_len = base_offset
.checked_add(raw.len().into_u64())
// write_raw would error if we were to overflow u64.
// also IndexEntry and higher levels in
//the code don't allow the file to grow that large
.unwrap();
assert_eq!(new_size, expected_new_len);
let expected_new_len = base_offset
.checked_add(raw.len().into_u64())
// write_raw would error if we were to overflow u64.
// also IndexEntry and higher levels in
//the code don't allow the file to grow that large
.unwrap();
assert_eq!(new_size, expected_new_len);
inner.resource_units.maybe_publish_size(new_size);
(base_offset, metadata)
};
// Update the index with the new entries
let mut index = self.index.write().await;
for meta in metadata {
let SerializedValueMeta {
key,
@@ -639,7 +660,7 @@ impl InMemoryLayer {
will_init,
})?;
let vec_map = inner.index.entry(key).or_default();
let vec_map = index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0;
if old.is_some() {
// This should not break anything, but is unexpected: ingestion code aims to filter out
@@ -658,8 +679,6 @@ impl InMemoryLayer {
);
}
inner.resource_units.maybe_publish_size(new_size);
Ok(())
}
@@ -680,6 +699,18 @@ impl InMemoryLayer {
/// Records the end_lsn for non-dropped layers.
/// `end_lsn` is exclusive
///
/// A note on locking:
/// The current API of [`InMemoryLayer`] does not ensure that there's no ongoing
/// writes while freezing the layer. This is enforced at a higher level via
/// [`crate::tenant::Timeline::write_lock`]. Freeze might be called via two code paths:
/// 1. Via the active [`crate::tenant::timeline::TimelineWriter`]. This holds the
/// Timeline::write_lock for its lifetime. The rolling is handled in
/// [`crate::tenant::timeline::TimelineWriter::put_batch`]. It's a &mut self function
/// so can't be called from different threads.
/// 2. In the background via [`crate::tenant::Timeline::maybe_freeze_ephemeral_layer`].
/// This only proceeds if try_lock on Timeline::write_lock succeeds (i.e. there's no active writer),
/// hence there can be no concurrent writes
pub async fn freeze(&self, end_lsn: Lsn) {
assert!(
self.start_lsn < end_lsn,
@@ -700,8 +731,8 @@ impl InMemoryLayer {
#[cfg(debug_assertions)]
{
let inner = self.inner.write().await;
for vec_map in inner.index.values() {
let index = self.index.read().await;
for vec_map in index.values() {
for (lsn, _) in vec_map.as_slice() {
assert!(*lsn < end_lsn);
}
@@ -724,14 +755,11 @@ impl InMemoryLayer {
) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
// Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to acquire the
// write lock on it, so we shouldn't block anyone. There's one exception
// though: another thread might have grabbed a reference to this layer
// in `get_layer_for_write' just before the checkpointer called
// `freeze`, and then `write_to_disk` on it. When the thread gets the
// lock, it will see that it's not writeable anymore and retry, but it
// would have to wait until we release it. That race condition is very
// rare though, so we just accept the potential latency hit for now.
// write lock on it, so we shouldn't block anyone. See the comment on
// [`InMemoryLayer::freeze`] to understand how locking between the append path
// and layer flushing works.
let inner = self.inner.read().await;
let index = self.index.read().await;
use l0_flush::Inner;
let _concurrency_permit = match l0_flush_global_state {
@@ -743,13 +771,9 @@ impl InMemoryLayer {
let key_count = if let Some(key_range) = key_range {
let key_range = key_range.start.to_compact()..key_range.end.to_compact();
inner
.index
.iter()
.filter(|(k, _)| key_range.contains(k))
.count()
index.iter().filter(|(k, _)| key_range.contains(k)).count()
} else {
inner.index.len()
index.len()
};
if key_count == 0 {
return Ok(None);
@@ -772,7 +796,7 @@ impl InMemoryLayer {
let file_contents = inner.file.load_to_io_buf(ctx).await?;
let file_contents = file_contents.freeze();
for (key, vec_map) in inner.index.iter() {
for (key, vec_map) in index.iter() {
// Write all page versions
for (lsn, entry) in vec_map
.as_slice()

View File

@@ -529,29 +529,24 @@ impl GcInfo {
/// The `GcInfo` component describing which Lsns need to be retained. Functionally, this
/// is a single number (the oldest LSN which we must retain), but it internally distinguishes
/// between time-based and space-based retention for observability and consumption metrics purposes.
#[derive(Debug, Clone)]
#[derive(Clone, Debug, Default)]
pub(crate) struct GcCutoffs {
/// Calculated from the [`pageserver_api::models::TenantConfig::gc_horizon`], this LSN indicates how much
/// history we must keep to retain a specified number of bytes of WAL.
pub(crate) space: Lsn,
/// Calculated from [`pageserver_api::models::TenantConfig::pitr_interval`], this LSN indicates how much
/// history we must keep to enable reading back at least the PITR interval duration.
pub(crate) time: Lsn,
}
impl Default for GcCutoffs {
fn default() -> Self {
Self {
space: Lsn::INVALID,
time: Lsn::INVALID,
}
}
/// Calculated from [`pageserver_api::models::TenantConfig::pitr_interval`], this LSN indicates
/// how much history we must keep to enable reading back at least the PITR interval duration.
///
/// None indicates that the PITR cutoff has not been computed. A PITR interval of 0 will yield
/// Some(last_record_lsn).
pub(crate) time: Option<Lsn>,
}
impl GcCutoffs {
fn select_min(&self) -> Lsn {
std::cmp::min(self.space, self.time)
// NB: if we haven't computed the PITR cutoff yet, we can't GC anything.
self.space.min(self.time.unwrap_or_default())
}
}
@@ -1088,11 +1083,14 @@ impl Timeline {
/// Get the bytes written since the PITR cutoff on this branch, and
/// whether this branch's ancestor_lsn is within its parent's PITR.
pub(crate) fn get_pitr_history_stats(&self) -> (u64, bool) {
// TODO: for backwards compatibility, we return the full history back to 0 when the PITR
// cutoff has not yet been initialized. This should return None instead, but this is exposed
// in external HTTP APIs and callers may not handle a null value.
let gc_info = self.gc_info.read().unwrap();
let history = self
.get_last_record_lsn()
.checked_sub(gc_info.cutoffs.time)
.unwrap_or(Lsn(0))
.checked_sub(gc_info.cutoffs.time.unwrap_or_default())
.unwrap_or_default()
.0;
(history, gc_info.within_ancestor_pitr)
}
@@ -1102,9 +1100,10 @@ impl Timeline {
self.applied_gc_cutoff_lsn.read()
}
/// Read timeline's planned GC cutoff: this is the logical end of history that users
/// are allowed to read (based on configured PITR), even if physically we have more history.
pub(crate) fn get_gc_cutoff_lsn(&self) -> Lsn {
/// Read timeline's planned GC cutoff: this is the logical end of history that users are allowed
/// to read (based on configured PITR), even if physically we have more history. Returns None
/// if the PITR cutoff has not yet been initialized.
pub(crate) fn get_gc_cutoff_lsn(&self) -> Option<Lsn> {
self.gc_info.read().unwrap().cutoffs.time
}
@@ -6235,14 +6234,12 @@ impl Timeline {
pausable_failpoint!("Timeline::find_gc_cutoffs-pausable");
if cfg!(test) {
if cfg!(test) && pitr == Duration::ZERO {
// Unit tests which specify zero PITR interval expect to avoid doing any I/O for timestamp lookup
if pitr == Duration::ZERO {
return Ok(GcCutoffs {
time: self.get_last_record_lsn(),
space: space_cutoff,
});
}
return Ok(GcCutoffs {
time: Some(self.get_last_record_lsn()),
space: space_cutoff,
});
}
// Calculate a time-based limit on how much to retain:
@@ -6256,14 +6253,14 @@ impl Timeline {
// PITR is not set. Retain the size-based limit, or the default time retention,
// whichever requires less data.
GcCutoffs {
time: self.get_last_record_lsn(),
time: Some(self.get_last_record_lsn()),
space: std::cmp::max(time_cutoff, space_cutoff),
}
}
(Duration::ZERO, None) => {
// PITR is not set, and time lookup failed
GcCutoffs {
time: self.get_last_record_lsn(),
time: Some(self.get_last_record_lsn()),
space: space_cutoff,
}
}
@@ -6271,7 +6268,7 @@ impl Timeline {
// PITR interval is set & we didn't look up a timestamp successfully. Conservatively assume PITR
// cannot advance beyond what was already GC'd, and respect space-based retention
GcCutoffs {
time: *self.get_applied_gc_cutoff_lsn(),
time: Some(*self.get_applied_gc_cutoff_lsn()),
space: space_cutoff,
}
}
@@ -6279,7 +6276,7 @@ impl Timeline {
// PITR interval is set and we looked up timestamp successfully. Ignore
// size based retention and make time cutoff authoritative
GcCutoffs {
time: time_cutoff,
time: Some(time_cutoff),
space: time_cutoff,
}
}
@@ -6332,7 +6329,7 @@ impl Timeline {
)
};
let mut new_gc_cutoff = Lsn::min(space_cutoff, time_cutoff);
let mut new_gc_cutoff = space_cutoff.min(time_cutoff.unwrap_or_default());
let standby_horizon = self.standby_horizon.load();
// Hold GC for the standby, but as a safety guard do it only within some
// reasonable lag.
@@ -6381,7 +6378,7 @@ impl Timeline {
async fn gc_timeline(
&self,
space_cutoff: Lsn,
time_cutoff: Lsn,
time_cutoff: Option<Lsn>, // None if uninitialized
retain_lsns: Vec<Lsn>,
max_lsn_with_valid_lease: Option<Lsn>,
new_gc_cutoff: Lsn,
@@ -6400,6 +6397,12 @@ impl Timeline {
return Ok(result);
}
let Some(time_cutoff) = time_cutoff else {
// The GC cutoff should have been computed by now, but let's be defensive.
info!("Nothing to GC: time_cutoff not yet computed");
return Ok(result);
};
// We need to ensure that no one tries to read page versions or create
// branches at a point before latest_gc_cutoff_lsn. See branch_timeline()
// for details. This will block until the old value is no longer in use.

View File

@@ -1526,7 +1526,7 @@ impl Timeline {
info!(
"starting shard ancestor compaction, rewriting {} layers and dropping {} layers, \
checked {layers_checked}/{layers_total} layers \
(latest_gc_cutoff={} pitr_cutoff={})",
(latest_gc_cutoff={} pitr_cutoff={:?})",
layers_to_rewrite.len(),
drop_layers.len(),
*latest_gc_cutoff,

17
poetry.lock generated
View File

@@ -3170,19 +3170,24 @@ pbr = "*"
[[package]]
name = "setuptools"
version = "70.0.0"
version = "78.1.1"
description = "Easily download, build, install, upgrade, and uninstall Python packages"
optional = false
python-versions = ">=3.8"
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "setuptools-70.0.0-py3-none-any.whl", hash = "sha256:54faa7f2e8d2d11bcd2c07bed282eef1046b5c080d1c32add737d7b5817b1ad4"},
{file = "setuptools-70.0.0.tar.gz", hash = "sha256:f211a66637b8fa059bb28183da127d4e86396c991a942b028c6650d4319c3fd0"},
{file = "setuptools-78.1.1-py3-none-any.whl", hash = "sha256:c3a9c4211ff4c309edb8b8c4f1cbfa7ae324c4ba9f91ff254e3d305b9fd54561"},
{file = "setuptools-78.1.1.tar.gz", hash = "sha256:fcc17fd9cd898242f6b4adfaca46137a9edef687f43e6f78469692a5e70d851d"},
]
[package.extras]
docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier"]
testing = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "importlib-metadata", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21) ; python_version >= \"3.9\" and sys_platform != \"cygwin\"", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "mypy (==1.9)", "packaging (>=23.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.1)", "pytest-checkdocs (>=2.4)", "pytest-cov ; platform_python_implementation != \"PyPy\"", "pytest-enabler (>=2.2)", "pytest-home (>=0.5)", "pytest-mypy", "pytest-perf ; sys_platform != \"cygwin\"", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\"", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"]
check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\"", "ruff (>=0.8.0) ; sys_platform != \"cygwin\""]
core = ["importlib_metadata (>=6) ; python_version < \"3.10\"", "jaraco.functools (>=4)", "jaraco.text (>=3.7)", "more_itertools", "more_itertools (>=8.8)", "packaging (>=24.2)", "platformdirs (>=4.2.2)", "tomli (>=2.0.1) ; python_version < \"3.11\"", "wheel (>=0.43.0)"]
cover = ["pytest-cov"]
doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier", "towncrier (<24.7)"]
enabler = ["pytest-enabler (>=2.2)"]
test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21) ; python_version >= \"3.9\" and sys_platform != \"cygwin\"", "jaraco.envs (>=2.2)", "jaraco.path (>=3.7.2)", "jaraco.test (>=5.5)", "packaging (>=24.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-home (>=0.5)", "pytest-perf ; sys_platform != \"cygwin\"", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel (>=0.44.0)"]
type = ["importlib_metadata (>=7.0.2) ; python_version < \"3.10\"", "jaraco.develop (>=7.21) ; sys_platform != \"cygwin\"", "mypy (==1.14.*)", "pytest-mypy"]
[[package]]
name = "six"

View File

@@ -127,3 +127,4 @@ rstest.workspace = true
walkdir.workspace = true
rand_distr = "0.4"
tokio-postgres.workspace = true
tracing-test = "0.2"

View File

@@ -80,10 +80,22 @@ impl std::fmt::Display for Backend<'_, ()> {
.field(&endpoint.url())
.finish(),
#[cfg(any(test, feature = "testing"))]
ControlPlaneClient::PostgresMock(endpoint) => fmt
.debug_tuple("ControlPlane::PostgresMock")
.field(&endpoint.url())
.finish(),
ControlPlaneClient::PostgresMock(endpoint) => {
let url = endpoint.url();
match url::Url::parse(url) {
Ok(mut url) => {
let _ = url.set_password(Some("_redacted_"));
let url = url.as_str();
fmt.debug_tuple("ControlPlane::PostgresMock")
.field(&url)
.finish()
}
Err(_) => fmt
.debug_tuple("ControlPlane::PostgresMock")
.field(&url)
.finish(),
}
}
#[cfg(test)]
ControlPlaneClient::Test(_) => fmt.debug_tuple("ControlPlane::Test").finish(),
},

View File

@@ -1,9 +1,13 @@
#[cfg(any(test, feature = "testing"))]
use std::env;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::pin::pin;
use std::sync::Arc;
use std::time::Duration;
#[cfg(any(test, feature = "testing"))]
use anyhow::Context;
use anyhow::{bail, ensure};
use arc_swap::ArcSwapOption;
use futures::future::Either;
@@ -35,6 +39,8 @@ use crate::scram::threadpool::ThreadPool;
use crate::serverless::GlobalConnPoolOptions;
use crate::serverless::cancel_set::CancelSet;
use crate::tls::client_config::compute_client_config_with_root_certs;
#[cfg(any(test, feature = "testing"))]
use crate::url::ApiUrl;
use crate::{auth, control_plane, http, serverless, usage_metrics};
project_git_version!(GIT_VERSION);
@@ -777,7 +783,13 @@ fn build_auth_backend(
#[cfg(any(test, feature = "testing"))]
AuthBackendType::Postgres => {
let url = args.auth_endpoint.parse()?;
let mut url: ApiUrl = args.auth_endpoint.parse()?;
if url.password().is_none() {
let password = env::var("PGPASSWORD")
.with_context(|| "auth-endpoint does not contain a password and environment variable `PGPASSWORD` is not set")?;
url.set_password(Some(&password))
.expect("Failed to set password");
}
let api = control_plane::client::mock::MockControlPlane::new(
url,
!args.is_private_access_proxy,

View File

@@ -48,7 +48,7 @@ impl ShouldRetryWakeCompute for postgres_client::error::DbError {
use postgres_client::error::SqlState;
// Here are errors that happens after the user successfully authenticated to the database.
// TODO: there are pgbouncer errors that should be retried, but they are not listed here.
!matches!(
let non_retriable_pg_errors = matches!(
self.code(),
&SqlState::TOO_MANY_CONNECTIONS
| &SqlState::OUT_OF_MEMORY
@@ -56,8 +56,20 @@ impl ShouldRetryWakeCompute for postgres_client::error::DbError {
| &SqlState::T_R_SERIALIZATION_FAILURE
| &SqlState::INVALID_CATALOG_NAME
| &SqlState::INVALID_SCHEMA_NAME
| &SqlState::INVALID_PARAMETER_VALUE
)
| &SqlState::INVALID_PARAMETER_VALUE,
);
if non_retriable_pg_errors {
return false;
}
// PGBouncer errors that should not trigger a wake_compute retry.
if self.code() == &SqlState::PROTOCOL_VIOLATION {
// Source for the error message:
// https://github.com/pgbouncer/pgbouncer/blob/f15997fe3effe3a94ba8bcc1ea562e6117d1a131/src/client.c#L1070
return !self
.message()
.contains("no more connections allowed (max_client_conn)");
}
true
}
}
@@ -110,3 +122,55 @@ pub(crate) fn retry_after(num_retries: u32, config: RetryConfig) -> time::Durati
.base_delay
.mul_f64(config.backoff_factor.powi((num_retries as i32) - 1))
}
#[cfg(test)]
mod tests {
use super::ShouldRetryWakeCompute;
use postgres_client::error::{DbError, SqlState};
#[test]
fn should_retry_wake_compute_for_db_error() {
// These SQLStates should NOT trigger a wake_compute retry.
let non_retry_states = [
SqlState::TOO_MANY_CONNECTIONS,
SqlState::OUT_OF_MEMORY,
SqlState::SYNTAX_ERROR,
SqlState::T_R_SERIALIZATION_FAILURE,
SqlState::INVALID_CATALOG_NAME,
SqlState::INVALID_SCHEMA_NAME,
SqlState::INVALID_PARAMETER_VALUE,
];
for state in non_retry_states {
let err = DbError::new_test_error(state.clone(), "oops".to_string());
assert!(
!err.should_retry_wake_compute(),
"State {state:?} unexpectedly retried"
);
}
// Errors coming from pgbouncer should not trigger a wake_compute retry
let non_retry_pgbouncer_errors = ["no more connections allowed (max_client_conn)"];
for error in non_retry_pgbouncer_errors {
let err = DbError::new_test_error(SqlState::PROTOCOL_VIOLATION, error.to_string());
assert!(
!err.should_retry_wake_compute(),
"PGBouncer error {error:?} unexpectedly retried"
);
}
// These SQLStates should trigger a wake_compute retry.
let retry_states = [
SqlState::CONNECTION_FAILURE,
SqlState::CONNECTION_EXCEPTION,
SqlState::CONNECTION_DOES_NOT_EXIST,
SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION,
];
for state in retry_states {
let err = DbError::new_test_error(state.clone(), "oops".to_string());
assert!(
err.should_retry_wake_compute(),
"State {state:?} unexpectedly skipped retry"
);
}
}
}

View File

@@ -15,6 +15,7 @@ use rstest::rstest;
use rustls::crypto::ring;
use rustls::pki_types;
use tokio::io::DuplexStream;
use tracing_test::traced_test;
use super::connect_compute::ConnectMechanism;
use super::retry::CouldRetry;
@@ -381,8 +382,14 @@ enum ConnectAction {
WakeFail,
WakeRetry,
Connect,
// connect_once -> Err, could_retry = true, should_retry_wake_compute = true
Retry,
// connect_once -> Err, could_retry = true, should_retry_wake_compute = false
RetryNoWake,
// connect_once -> Err, could_retry = false, should_retry_wake_compute = true
Fail,
// connect_once -> Err, could_retry = false, should_retry_wake_compute = false
FailNoWake,
}
#[derive(Clone)]
@@ -424,6 +431,7 @@ struct TestConnection;
#[derive(Debug)]
struct TestConnectError {
retryable: bool,
wakeable: bool,
kind: crate::error::ErrorKind,
}
@@ -448,7 +456,7 @@ impl CouldRetry for TestConnectError {
}
impl ShouldRetryWakeCompute for TestConnectError {
fn should_retry_wake_compute(&self) -> bool {
true
self.wakeable
}
}
@@ -471,10 +479,22 @@ impl ConnectMechanism for TestConnectMechanism {
ConnectAction::Connect => Ok(TestConnection),
ConnectAction::Retry => Err(TestConnectError {
retryable: true,
wakeable: true,
kind: ErrorKind::Compute,
}),
ConnectAction::RetryNoWake => Err(TestConnectError {
retryable: true,
wakeable: false,
kind: ErrorKind::Compute,
}),
ConnectAction::Fail => Err(TestConnectError {
retryable: false,
wakeable: true,
kind: ErrorKind::Compute,
}),
ConnectAction::FailNoWake => Err(TestConnectError {
retryable: false,
wakeable: false,
kind: ErrorKind::Compute,
}),
x => panic!("expecting action {x:?}, connect is called instead"),
@@ -709,3 +729,92 @@ async fn wake_non_retry() {
.unwrap_err();
mechanism.verify();
}
#[tokio::test]
#[traced_test]
async fn fail_but_wake_invalidates_cache() {
let ctx = RequestContext::test();
let mech = TestConnectMechanism::new(vec![
ConnectAction::Wake,
ConnectAction::Fail,
ConnectAction::Wake,
ConnectAction::Connect,
]);
let user = helper_create_connect_info(&mech);
let cfg = config();
connect_to_compute(&ctx, &mech, &user, cfg.retry, &cfg)
.await
.unwrap();
assert!(logs_contain(
"invalidating stalled compute node info cache entry"
));
}
#[tokio::test]
#[traced_test]
async fn fail_no_wake_skips_cache_invalidation() {
let ctx = RequestContext::test();
let mech = TestConnectMechanism::new(vec![
ConnectAction::Wake,
ConnectAction::FailNoWake,
ConnectAction::Connect,
]);
let user = helper_create_connect_info(&mech);
let cfg = config();
connect_to_compute(&ctx, &mech, &user, cfg.retry, &cfg)
.await
.unwrap();
assert!(!logs_contain(
"invalidating stalled compute node info cache entry"
));
}
#[tokio::test]
#[traced_test]
async fn retry_but_wake_invalidates_cache() {
let _ = env_logger::try_init();
use ConnectAction::*;
let ctx = RequestContext::test();
// Wake → Retry (retryable + wakeable) → Wake → Connect
let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
let cfg = config();
connect_to_compute(&ctx, &mechanism, &user_info, cfg.retry, &cfg)
.await
.unwrap();
mechanism.verify();
// Because Retry has wakeable=true, we should see invalidate_cache
assert!(logs_contain(
"invalidating stalled compute node info cache entry"
));
}
#[tokio::test]
#[traced_test]
async fn retry_no_wake_skips_invalidation() {
let _ = env_logger::try_init();
use ConnectAction::*;
let ctx = RequestContext::test();
// Wake → RetryNoWake (retryable + NOT wakeable)
let mechanism = TestConnectMechanism::new(vec![Wake, RetryNoWake]);
let user_info = helper_create_connect_info(&mechanism);
let cfg = config();
connect_to_compute(&ctx, &mechanism, &user_info, cfg.retry, &cfg)
.await
.unwrap_err();
mechanism.verify();
// Because RetryNoWake has wakeable=false, we must NOT see invalidate_cache
assert!(!logs_contain(
"invalidating stalled compute node info cache entry"
));
}

View File

@@ -14,7 +14,9 @@ use hyper::http::{HeaderName, HeaderValue};
use hyper::{HeaderMap, Request, Response, StatusCode, header};
use indexmap::IndexMap;
use postgres_client::error::{DbError, ErrorPosition, SqlState};
use postgres_client::{GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, Transaction};
use postgres_client::{
GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, RowStream, Transaction,
};
use pq_proto::StartupMessageParamsBuilder;
use serde::Serialize;
use serde_json::Value;
@@ -1092,12 +1094,10 @@ async fn query_to_json<T: GenericClient>(
let query_start = Instant::now();
let query_params = data.params;
let mut row_stream = std::pin::pin!(
client
.query_raw_txt(&data.query, query_params)
.await
.map_err(SqlOverHttpError::Postgres)?
);
let mut row_stream = client
.query_raw_txt(&data.query, query_params)
.await
.map_err(SqlOverHttpError::Postgres)?;
let query_acknowledged = Instant::now();
// Manually drain the stream into a vector to leave row_stream hanging
@@ -1118,10 +1118,15 @@ async fn query_to_json<T: GenericClient>(
}
let query_resp_end = Instant::now();
let ready = row_stream.ready_status();
let RowStream {
statement,
command_tag,
status: ready,
..
} = row_stream;
// grab the command tag and number of rows affected
let command_tag = row_stream.command_tag().unwrap_or_default();
let command_tag = command_tag.unwrap_or_default();
let mut command_tag_split = command_tag.split(' ');
let command_tag_name = command_tag_split.next().unwrap_or_default();
let command_tag_count = if command_tag_name == "INSERT" {
@@ -1142,11 +1147,11 @@ async fn query_to_json<T: GenericClient>(
"finished executing query"
);
let columns_len = row_stream.columns().len();
let columns_len = statement.columns().len();
let mut fields = Vec::with_capacity(columns_len);
let mut columns = Vec::with_capacity(columns_len);
for c in row_stream.columns() {
for c in statement.columns() {
fields.push(json!({
"name": c.name().to_owned(),
"dataTypeID": c.type_().oid(),

View File

@@ -43,6 +43,12 @@ impl std::ops::Deref for ApiUrl {
}
}
impl std::ops::DerefMut for ApiUrl {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl std::fmt::Display for ApiUrl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)

View File

@@ -184,6 +184,7 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
"pageserver_evictions_with_low_residence_duration_total",
"pageserver_aux_file_estimated_size",
"pageserver_valid_lsn_lease_count",
"pageserver_tenant_offloaded_timelines",
counter("pageserver_tenant_throttling_count_accounted_start"),
counter("pageserver_tenant_throttling_count_accounted_finish"),
counter("pageserver_tenant_throttling_wait_usecs_sum"),

View File

@@ -193,6 +193,11 @@ def test_timeline_offloading(neon_env_builder: NeonEnvBuilder, manual_offload: b
"test_ancestor_branch_archive_branch1", tenant_id, "test_ancestor_branch_archive_parent"
)
offloaded_count = ps_http.get_metric_value(
"pageserver_tenant_offloaded_timelines", {"tenant_id": f"{tenant_id}"}
)
assert offloaded_count == 0
ps_http.timeline_archival_config(
tenant_id,
leaf_timeline_id,
@@ -244,6 +249,11 @@ def test_timeline_offloading(neon_env_builder: NeonEnvBuilder, manual_offload: b
wait_until(leaf_offloaded)
wait_until(parent_offloaded)
offloaded_count = ps_http.get_metric_value(
"pageserver_tenant_offloaded_timelines", {"tenant_id": f"{tenant_id}"}
)
assert offloaded_count == 2
# Offloaded child timelines should still prevent deletion
with pytest.raises(
PageserverApiException,

View File

@@ -107,6 +107,7 @@ tower = { version = "0.4", default-features = false, features = ["balance", "buf
tracing = { version = "0.1", features = ["log"] }
tracing-core = { version = "0.1" }
tracing-log = { version = "0.2" }
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
url = { version = "2", features = ["serde"] }
uuid = { version = "1", features = ["serde", "v4", "v7"] }
zeroize = { version = "1", features = ["derive", "serde"] }