Compare commits

..

1 Commits

Author SHA1 Message Date
Arpad Müller
6077f5062d Add skip_serializing_none 2024-12-03 15:58:57 +01:00
80 changed files with 1211 additions and 1296 deletions

2
Cargo.lock generated
View File

@@ -4209,6 +4209,7 @@ dependencies = [
"bytes",
"fallible-iterator",
"hmac",
"md-5",
"memchr",
"rand 0.8.5",
"sha2",
@@ -4611,7 +4612,6 @@ dependencies = [
"tikv-jemalloc-ctl",
"tikv-jemallocator",
"tokio",
"tokio-postgres",
"tokio-postgres2",
"tokio-rustls 0.26.0",
"tokio-tungstenite",

View File

@@ -442,14 +442,7 @@ impl Default for ConfigToml {
tenant_config: TenantConfigToml::default(),
no_sync: None,
wal_receiver_protocol: DEFAULT_WAL_RECEIVER_PROTOCOL,
page_service_pipelining: if !cfg!(test) {
PageServicePipeliningConfig::Serial
} else {
PageServicePipeliningConfig::Pipelined(PageServicePipeliningConfigPipelined {
max_batch_size: NonZeroUsize::new(32).unwrap(),
execution: PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures,
})
},
page_service_pipelining: PageServicePipeliningConfig::Serial,
}
}
}

View File

@@ -48,7 +48,7 @@ pub struct TenantCreateResponse {
pub shards: Vec<TenantCreateResponseShard>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize)]
pub struct NodeRegisterRequest {
pub node_id: NodeId,
@@ -75,7 +75,7 @@ pub struct TenantPolicyRequest {
pub scheduling: Option<ShardSchedulingPolicy>,
}
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct AvailabilityZone(pub String);
impl Display for AvailabilityZone {

View File

@@ -770,11 +770,6 @@ impl Key {
&& self.field6 == 1
}
#[inline(always)]
pub fn is_aux_file_key(&self) -> bool {
self.field1 == AUX_KEY_PREFIX
}
/// Guaranteed to return `Ok()` if [`Self::is_rel_block_key`] returns `true` for `key`.
#[inline(always)]
pub fn to_rel_block(self) -> anyhow::Result<(RelTag, BlockNumber)> {

View File

@@ -327,6 +327,7 @@ impl Default for ShardParameters {
/// An alternative representation of `pageserver::tenant::TenantConf` with
/// simpler types.
#[serde_with::skip_serializing_none]
#[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
pub struct TenantConfig {
pub checkpoint_distance: Option<u64>,
@@ -501,9 +502,7 @@ pub struct EvictionPolicyLayerAccessThreshold {
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct ThrottleConfig {
/// See [`ThrottleConfigTaskKinds`] for why we do the serde `rename`.
#[serde(rename = "task_kinds")]
pub enabled: ThrottleConfigTaskKinds,
pub task_kinds: Vec<String>, // TaskKind
pub initial: u32,
#[serde(with = "humantime_serde")]
pub refill_interval: Duration,
@@ -511,38 +510,10 @@ pub struct ThrottleConfig {
pub max: u32,
}
/// Before <https://github.com/neondatabase/neon/pull/9962>
/// the throttle was a per `Timeline::get`/`Timeline::get_vectored` call.
/// The `task_kinds` field controlled which Pageserver "Task Kind"s
/// were subject to the throttle.
///
/// After that PR, the throttle is applied at pagestream request level
/// and the `task_kinds` field does not apply since the only task kind
/// that us subject to the throttle is that of the page service.
///
/// However, we don't want to make a breaking config change right now
/// because it means we have to migrate all the tenant configs.
/// This will be done in a future PR.
///
/// In the meantime, we use emptiness / non-emptsiness of the `task_kinds`
/// field to determine if the throttle is enabled or not.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(transparent)]
pub struct ThrottleConfigTaskKinds(Vec<String>);
impl ThrottleConfigTaskKinds {
pub fn disabled() -> Self {
Self(vec![])
}
pub fn is_enabled(&self) -> bool {
!self.0.is_empty()
}
}
impl ThrottleConfig {
pub fn disabled() -> Self {
Self {
enabled: ThrottleConfigTaskKinds::disabled(),
task_kinds: vec![], // effectively disables the throttle
// other values don't matter with emtpy `task_kinds`.
initial: 0,
refill_interval: Duration::from_millis(1),
@@ -556,30 +527,6 @@ impl ThrottleConfig {
}
}
#[cfg(test)]
mod throttle_config_tests {
use super::*;
#[test]
fn test_disabled_is_disabled() {
let config = ThrottleConfig::disabled();
assert!(!config.enabled.is_enabled());
}
#[test]
fn test_enabled_backwards_compat() {
let input = serde_json::json!({
"task_kinds": ["PageRequestHandler"],
"initial": 40000,
"refill_interval": "50ms",
"refill_amount": 1000,
"max": 40000,
"fair": true
});
let config: ThrottleConfig = serde_json::from_value(input).unwrap();
assert!(config.enabled.is_enabled());
}
}
/// A flattened analog of a `pagesever::tenant::LocationMode`, which
/// lists out all possible states (and the virtual "Detached" state)
/// in a flat form rather than using rust-style enums.

View File

@@ -170,37 +170,19 @@ impl ShardIdentity {
}
}
/// Return true if the key should be stored on all shards, not just one.
fn is_key_global(&self, key: &Key) -> bool {
if key.is_slru_block_key() || key.is_slru_segment_size_key() || key.is_aux_file_key() {
// Special keys that are only stored on shard 0
false
} else if key.is_rel_block_key() {
// Ordinary relation blocks are distributed across shards
false
} else if key.is_rel_size_key() {
// All shards maintain rel size keys (although only shard 0 is responsible for
// keeping it strictly accurate, other shards just reflect the highest block they've ingested)
true
} else {
// For everything else, we assume it must be kept everywhere, because ingest code
// might assume this -- this covers functionality where the ingest code has
// not (yet) been made fully shard aware.
true
}
}
/// Return true if the key should be discarded if found in this shard's
/// data store, e.g. during compaction after a split.
///
/// Shards _may_ drop keys which return false here, but are not obliged to.
pub fn is_key_disposable(&self, key: &Key) -> bool {
if self.count < ShardCount(2) {
// Fast path: unsharded tenant doesn't dispose of anything
return false;
}
if self.is_key_global(key) {
if key_is_shard0(key) {
// Q: Why can't we dispose of shard0 content if we're not shard 0?
// A1: because the WAL ingestion logic currently ingests some shard 0
// content on all shards, even though it's only read on shard 0. If we
// dropped it, then subsequent WAL ingest to these keys would encounter
// an error.
// A2: because key_is_shard0 also covers relation size keys, which are written
// on all shards even though they're only maintained accurately on shard 0.
false
} else {
!self.is_key_local(key)

View File

@@ -278,7 +278,7 @@ pub fn generate_pg_control(
checkpoint_bytes: &[u8],
lsn: Lsn,
pg_version: u32,
) -> anyhow::Result<(Bytes, u64, bool)> {
) -> anyhow::Result<(Bytes, u64)> {
dispatch_pgversion!(
pg_version,
pgv::xlog_utils::generate_pg_control(pg_control_bytes, checkpoint_bytes, lsn),

View File

@@ -124,64 +124,23 @@ pub fn normalize_lsn(lsn: Lsn, seg_sz: usize) -> Lsn {
}
}
/// Generate a pg_control file, for a basebackup for starting up Postgres at the given LSN
///
/// 'pg_control_bytes' and 'checkpoint_bytes' are the contents of those keys persisted in
/// the pageserver. They use the same format as the PostgreSQL control file and the
/// checkpoint record, but see walingest.rs for how exactly they are kept up to date.
/// 'lsn' is the LSN at which we're starting up.
///
/// Returns:
/// - pg_control file contents
/// - system_identifier, extracted from the persisted information
/// - true, if we're starting up from a "clean shutdown", i.e. if there was a shutdown
/// checkpoint at the given LSN
pub fn generate_pg_control(
pg_control_bytes: &[u8],
checkpoint_bytes: &[u8],
lsn: Lsn,
) -> anyhow::Result<(Bytes, u64, bool)> {
) -> anyhow::Result<(Bytes, u64)> {
let mut pg_control = ControlFileData::decode(pg_control_bytes)?;
let mut checkpoint = CheckPoint::decode(checkpoint_bytes)?;
let was_shutdown;
// Generate new pg_control needed for bootstrap
//
// NB: In the checkpoint struct that we persist in the pageserver, we have a different
// convention for the 'redo' field than in PostgreSQL: On a shutdown checkpoint,
// 'redo' points the *end* of the checkpoint WAL record. On PostgreSQL, it points to
// the beginning. Furthermore, on an online checkpoint, 'redo' is set to 0.
//
// We didn't always have this convention however, and old persisted records will have
// old REDO values that point to some old LSN.
//
// The upshot is that if 'redo' is equal to the "current" LSN, there was a shutdown
// checkpoint record at that point in WAL, with no new WAL records after it. That case
// can be treated as starting from a clean shutdown. All other cases are treated as
// non-clean shutdown. In Neon, we don't do WAL replay at startup in either case, so
// that distinction doesn't matter very much. As of this writing, it only affects
// whether the persisted pg_stats information can be used or not.
//
// In the Checkpoint struct in the returned pg_control file, the redo pointer is
// always set to the LSN we're starting at, to hint that no WAL replay is required.
// (There's some neon-specific code in Postgres startup to make that work, though.
// Just setting the redo pointer is not sufficient.)
if Lsn(checkpoint.redo) == lsn {
was_shutdown = true;
} else {
checkpoint.redo = normalize_lsn(lsn, WAL_SEGMENT_SIZE).0;
was_shutdown = false;
}
checkpoint.redo = normalize_lsn(lsn, WAL_SEGMENT_SIZE).0;
// We use DBState_DB_SHUTDOWNED even if it was not a clean shutdown. The
// neon-specific code at postgres startup ignores the state stored in the control
// file, similar to archive recovery in standalone PostgreSQL. Similarly, the
// checkPoint pointer is ignored, so just set it to 0.
//save new values in pg_control
pg_control.checkPoint = 0;
pg_control.checkPointCopy = checkpoint;
pg_control.state = DBState_DB_SHUTDOWNED;
Ok((pg_control.encode(), pg_control.system_identifier, was_shutdown))
Ok((pg_control.encode(), pg_control.system_identifier))
}
pub fn get_current_timestamp() -> TimestampTz {

View File

@@ -10,6 +10,7 @@ byteorder.workspace = true
bytes.workspace = true
fallible-iterator.workspace = true
hmac.workspace = true
md-5 = "0.10"
memchr = "2.0"
rand.workspace = true
sha2.workspace = true

View File

@@ -1,2 +1,37 @@
//! Authentication protocol support.
use md5::{Digest, Md5};
pub mod sasl;
/// Hashes authentication information in a way suitable for use in response
/// to an `AuthenticationMd5Password` message.
///
/// The resulting string should be sent back to the database in a
/// `PasswordMessage` message.
#[inline]
pub fn md5_hash(username: &[u8], password: &[u8], salt: [u8; 4]) -> String {
let mut md5 = Md5::new();
md5.update(password);
md5.update(username);
let output = md5.finalize_reset();
md5.update(format!("{:x}", output));
md5.update(salt);
format!("md5{:x}", md5.finalize())
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn md5() {
let username = b"md5_user";
let password = b"password";
let salt = [0x2a, 0x3d, 0x8f, 0xe0];
assert_eq!(
md5_hash(username, password, salt),
"md562af4dd09bbb41884907a838a3233294"
);
}
}

View File

@@ -79,7 +79,7 @@ pub enum Message {
AuthenticationCleartextPassword,
AuthenticationGss,
AuthenticationKerberosV5,
AuthenticationMd5Password,
AuthenticationMd5Password(AuthenticationMd5PasswordBody),
AuthenticationOk,
AuthenticationScmCredential,
AuthenticationSspi,
@@ -191,7 +191,11 @@ impl Message {
0 => Message::AuthenticationOk,
2 => Message::AuthenticationKerberosV5,
3 => Message::AuthenticationCleartextPassword,
5 => Message::AuthenticationMd5Password,
5 => {
let mut salt = [0; 4];
buf.read_exact(&mut salt)?;
Message::AuthenticationMd5Password(AuthenticationMd5PasswordBody { salt })
}
6 => Message::AuthenticationScmCredential,
7 => Message::AuthenticationGss,
8 => Message::AuthenticationGssContinue,

View File

@@ -8,6 +8,7 @@
use crate::authentication::sasl;
use hmac::{Hmac, Mac};
use md5::Md5;
use rand::RngCore;
use sha2::digest::FixedOutput;
use sha2::{Digest, Sha256};
@@ -87,3 +88,20 @@ pub(crate) async fn scram_sha_256_salt(
base64::encode(server_key)
)
}
/// **Not recommended, as MD5 is not considered to be secure.**
///
/// Hash password using MD5 with the username as the salt.
///
/// The client may assume the returned string doesn't contain any
/// special characters that would require escaping.
pub fn md5(password: &[u8], username: &str) -> String {
// salt password with username
let mut salted_password = Vec::from(password);
salted_password.extend_from_slice(username.as_bytes());
let mut hash = Md5::new();
hash.update(&salted_password);
let digest = hash.finalize();
format!("md5{:x}", digest)
}

View File

@@ -9,3 +9,11 @@ async fn test_encrypt_scram_sha_256() {
"SCRAM-SHA-256$4096:AQIDBAUGBwgJCgsMDQ4PEA==$8rrDg00OqaiWXJ7p+sCgHEIaBSHY89ZJl3mfIsf32oY=:05L1f+yZbiN8O0AnO40Og85NNRhvzTS57naKRWCcsIA="
);
}
#[test]
fn test_encrypt_md5() {
assert_eq!(
password::md5(b"secret", "foo"),
"md54ab2c5d00339c4b2a4e921d2dc4edec7"
);
}

View File

@@ -6,9 +6,11 @@ use crate::connect_raw::RawConnection;
use crate::tls::MakeTlsConnect;
use crate::tls::TlsConnect;
use crate::{Client, Connection, Error};
use std::fmt;
use std::borrow::Cow;
use std::str;
use std::str::FromStr;
use std::time::Duration;
use std::{error, fmt, iter, mem};
use tokio::io::{AsyncRead, AsyncWrite};
pub use postgres_protocol2::authentication::sasl::ScramKeys;
@@ -146,9 +148,6 @@ pub enum AuthKeys {
/// ```
#[derive(Clone, PartialEq, Eq)]
pub struct Config {
pub(crate) host: Host,
pub(crate) port: u16,
pub(crate) user: Option<String>,
pub(crate) password: Option<Vec<u8>>,
pub(crate) auth_keys: Option<Box<AuthKeys>>,
@@ -156,6 +155,8 @@ pub struct Config {
pub(crate) options: Option<String>,
pub(crate) application_name: Option<String>,
pub(crate) ssl_mode: SslMode,
pub(crate) host: Vec<Host>,
pub(crate) port: Vec<u16>,
pub(crate) connect_timeout: Option<Duration>,
pub(crate) target_session_attrs: TargetSessionAttrs,
pub(crate) channel_binding: ChannelBinding,
@@ -163,12 +164,16 @@ pub struct Config {
pub(crate) max_backend_message_size: Option<usize>,
}
impl Default for Config {
fn default() -> Config {
Config::new()
}
}
impl Config {
/// Creates a new configuration.
pub fn new(host: String, port: u16) -> Config {
pub fn new() -> Config {
Config {
host: Host::Tcp(host),
port,
user: None,
password: None,
auth_keys: None,
@@ -176,6 +181,8 @@ impl Config {
options: None,
application_name: None,
ssl_mode: SslMode::Prefer,
host: vec![],
port: vec![],
connect_timeout: None,
target_session_attrs: TargetSessionAttrs::Any,
channel_binding: ChannelBinding::Prefer,
@@ -278,14 +285,32 @@ impl Config {
self.ssl_mode
}
/// Adds a host to the configuration.
///
/// Multiple hosts can be specified by calling this method multiple times, and each will be tried in order.
pub fn host(&mut self, host: &str) -> &mut Config {
self.host.push(Host::Tcp(host.to_string()));
self
}
/// Gets the hosts that have been added to the configuration with `host`.
pub fn get_host(&self) -> &Host {
pub fn get_hosts(&self) -> &[Host] {
&self.host
}
/// Adds a port to the configuration.
///
/// Multiple ports can be specified by calling this method multiple times. There must either be no ports, in which
/// case the default of 5432 is used, a single port, in which it is used for all hosts, or the same number of ports
/// as hosts.
pub fn port(&mut self, port: u16) -> &mut Config {
self.port.push(port);
self
}
/// Gets the ports that have been added to the configuration with `port`.
pub fn get_port(&self) -> u16 {
self.port
pub fn get_ports(&self) -> &[u16] {
&self.port
}
/// Sets the timeout applied to socket-level connection attempts.
@@ -355,6 +380,99 @@ impl Config {
self.max_backend_message_size
}
fn param(&mut self, key: &str, value: &str) -> Result<(), Error> {
match key {
"user" => {
self.user(value);
}
"password" => {
self.password(value);
}
"dbname" => {
self.dbname(value);
}
"options" => {
self.options(value);
}
"application_name" => {
self.application_name(value);
}
"sslmode" => {
let mode = match value {
"disable" => SslMode::Disable,
"prefer" => SslMode::Prefer,
"require" => SslMode::Require,
_ => return Err(Error::config_parse(Box::new(InvalidValue("sslmode")))),
};
self.ssl_mode(mode);
}
"host" => {
for host in value.split(',') {
self.host(host);
}
}
"port" => {
for port in value.split(',') {
let port = if port.is_empty() {
5432
} else {
port.parse()
.map_err(|_| Error::config_parse(Box::new(InvalidValue("port"))))?
};
self.port(port);
}
}
"connect_timeout" => {
let timeout = value
.parse::<i64>()
.map_err(|_| Error::config_parse(Box::new(InvalidValue("connect_timeout"))))?;
if timeout > 0 {
self.connect_timeout(Duration::from_secs(timeout as u64));
}
}
"target_session_attrs" => {
let target_session_attrs = match value {
"any" => TargetSessionAttrs::Any,
"read-write" => TargetSessionAttrs::ReadWrite,
_ => {
return Err(Error::config_parse(Box::new(InvalidValue(
"target_session_attrs",
))));
}
};
self.target_session_attrs(target_session_attrs);
}
"channel_binding" => {
let channel_binding = match value {
"disable" => ChannelBinding::Disable,
"prefer" => ChannelBinding::Prefer,
"require" => ChannelBinding::Require,
_ => {
return Err(Error::config_parse(Box::new(InvalidValue(
"channel_binding",
))))
}
};
self.channel_binding(channel_binding);
}
"max_backend_message_size" => {
let limit = value.parse::<usize>().map_err(|_| {
Error::config_parse(Box::new(InvalidValue("max_backend_message_size")))
})?;
if limit > 0 {
self.max_backend_message_size(limit);
}
}
key => {
return Err(Error::config_parse(Box::new(UnknownOption(
key.to_string(),
))));
}
}
Ok(())
}
/// Opens a connection to a PostgreSQL database.
///
/// Requires the `runtime` Cargo feature (enabled by default).
@@ -381,6 +499,17 @@ impl Config {
}
}
impl FromStr for Config {
type Err = Error;
fn from_str(s: &str) -> Result<Config, Error> {
match UrlParser::parse(s)? {
Some(config) => Ok(config),
None => Parser::parse(s),
}
}
}
// Omit password from debug output
impl fmt::Debug for Config {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
@@ -407,3 +536,360 @@ impl fmt::Debug for Config {
.finish()
}
}
#[derive(Debug)]
struct UnknownOption(String);
impl fmt::Display for UnknownOption {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "unknown option `{}`", self.0)
}
}
impl error::Error for UnknownOption {}
#[derive(Debug)]
struct InvalidValue(&'static str);
impl fmt::Display for InvalidValue {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "invalid value for option `{}`", self.0)
}
}
impl error::Error for InvalidValue {}
struct Parser<'a> {
s: &'a str,
it: iter::Peekable<str::CharIndices<'a>>,
}
impl<'a> Parser<'a> {
fn parse(s: &'a str) -> Result<Config, Error> {
let mut parser = Parser {
s,
it: s.char_indices().peekable(),
};
let mut config = Config::new();
while let Some((key, value)) = parser.parameter()? {
config.param(key, &value)?;
}
Ok(config)
}
fn skip_ws(&mut self) {
self.take_while(char::is_whitespace);
}
fn take_while<F>(&mut self, f: F) -> &'a str
where
F: Fn(char) -> bool,
{
let start = match self.it.peek() {
Some(&(i, _)) => i,
None => return "",
};
loop {
match self.it.peek() {
Some(&(_, c)) if f(c) => {
self.it.next();
}
Some(&(i, _)) => return &self.s[start..i],
None => return &self.s[start..],
}
}
}
fn eat(&mut self, target: char) -> Result<(), Error> {
match self.it.next() {
Some((_, c)) if c == target => Ok(()),
Some((i, c)) => {
let m = format!(
"unexpected character at byte {}: expected `{}` but got `{}`",
i, target, c
);
Err(Error::config_parse(m.into()))
}
None => Err(Error::config_parse("unexpected EOF".into())),
}
}
fn eat_if(&mut self, target: char) -> bool {
match self.it.peek() {
Some(&(_, c)) if c == target => {
self.it.next();
true
}
_ => false,
}
}
fn keyword(&mut self) -> Option<&'a str> {
let s = self.take_while(|c| match c {
c if c.is_whitespace() => false,
'=' => false,
_ => true,
});
if s.is_empty() {
None
} else {
Some(s)
}
}
fn value(&mut self) -> Result<String, Error> {
let value = if self.eat_if('\'') {
let value = self.quoted_value()?;
self.eat('\'')?;
value
} else {
self.simple_value()?
};
Ok(value)
}
fn simple_value(&mut self) -> Result<String, Error> {
let mut value = String::new();
while let Some(&(_, c)) = self.it.peek() {
if c.is_whitespace() {
break;
}
self.it.next();
if c == '\\' {
if let Some((_, c2)) = self.it.next() {
value.push(c2);
}
} else {
value.push(c);
}
}
if value.is_empty() {
return Err(Error::config_parse("unexpected EOF".into()));
}
Ok(value)
}
fn quoted_value(&mut self) -> Result<String, Error> {
let mut value = String::new();
while let Some(&(_, c)) = self.it.peek() {
if c == '\'' {
return Ok(value);
}
self.it.next();
if c == '\\' {
if let Some((_, c2)) = self.it.next() {
value.push(c2);
}
} else {
value.push(c);
}
}
Err(Error::config_parse(
"unterminated quoted connection parameter value".into(),
))
}
fn parameter(&mut self) -> Result<Option<(&'a str, String)>, Error> {
self.skip_ws();
let keyword = match self.keyword() {
Some(keyword) => keyword,
None => return Ok(None),
};
self.skip_ws();
self.eat('=')?;
self.skip_ws();
let value = self.value()?;
Ok(Some((keyword, value)))
}
}
// This is a pretty sloppy "URL" parser, but it matches the behavior of libpq, where things really aren't very strict
struct UrlParser<'a> {
s: &'a str,
config: Config,
}
impl<'a> UrlParser<'a> {
fn parse(s: &'a str) -> Result<Option<Config>, Error> {
let s = match Self::remove_url_prefix(s) {
Some(s) => s,
None => return Ok(None),
};
let mut parser = UrlParser {
s,
config: Config::new(),
};
parser.parse_credentials()?;
parser.parse_host()?;
parser.parse_path()?;
parser.parse_params()?;
Ok(Some(parser.config))
}
fn remove_url_prefix(s: &str) -> Option<&str> {
for prefix in &["postgres://", "postgresql://"] {
if let Some(stripped) = s.strip_prefix(prefix) {
return Some(stripped);
}
}
None
}
fn take_until(&mut self, end: &[char]) -> Option<&'a str> {
match self.s.find(end) {
Some(pos) => {
let (head, tail) = self.s.split_at(pos);
self.s = tail;
Some(head)
}
None => None,
}
}
fn take_all(&mut self) -> &'a str {
mem::take(&mut self.s)
}
fn eat_byte(&mut self) {
self.s = &self.s[1..];
}
fn parse_credentials(&mut self) -> Result<(), Error> {
let creds = match self.take_until(&['@']) {
Some(creds) => creds,
None => return Ok(()),
};
self.eat_byte();
let mut it = creds.splitn(2, ':');
let user = self.decode(it.next().unwrap())?;
self.config.user(&user);
if let Some(password) = it.next() {
let password = Cow::from(percent_encoding::percent_decode(password.as_bytes()));
self.config.password(password);
}
Ok(())
}
fn parse_host(&mut self) -> Result<(), Error> {
let host = match self.take_until(&['/', '?']) {
Some(host) => host,
None => self.take_all(),
};
if host.is_empty() {
return Ok(());
}
for chunk in host.split(',') {
let (host, port) = if chunk.starts_with('[') {
let idx = match chunk.find(']') {
Some(idx) => idx,
None => return Err(Error::config_parse(InvalidValue("host").into())),
};
let host = &chunk[1..idx];
let remaining = &chunk[idx + 1..];
let port = if let Some(port) = remaining.strip_prefix(':') {
Some(port)
} else if remaining.is_empty() {
None
} else {
return Err(Error::config_parse(InvalidValue("host").into()));
};
(host, port)
} else {
let mut it = chunk.splitn(2, ':');
(it.next().unwrap(), it.next())
};
self.host_param(host)?;
let port = self.decode(port.unwrap_or("5432"))?;
self.config.param("port", &port)?;
}
Ok(())
}
fn parse_path(&mut self) -> Result<(), Error> {
if !self.s.starts_with('/') {
return Ok(());
}
self.eat_byte();
let dbname = match self.take_until(&['?']) {
Some(dbname) => dbname,
None => self.take_all(),
};
if !dbname.is_empty() {
self.config.dbname(&self.decode(dbname)?);
}
Ok(())
}
fn parse_params(&mut self) -> Result<(), Error> {
if !self.s.starts_with('?') {
return Ok(());
}
self.eat_byte();
while !self.s.is_empty() {
let key = match self.take_until(&['=']) {
Some(key) => self.decode(key)?,
None => return Err(Error::config_parse("unterminated parameter".into())),
};
self.eat_byte();
let value = match self.take_until(&['&']) {
Some(value) => {
self.eat_byte();
value
}
None => self.take_all(),
};
if key == "host" {
self.host_param(value)?;
} else {
let value = self.decode(value)?;
self.config.param(&key, &value)?;
}
}
Ok(())
}
fn host_param(&mut self, s: &str) -> Result<(), Error> {
let s = self.decode(s)?;
self.config.param("host", &s)
}
fn decode(&self, s: &'a str) -> Result<Cow<'a, str>, Error> {
percent_encoding::percent_decode(s.as_bytes())
.decode_utf8()
.map_err(|e| Error::config_parse(e.into()))
}
}

View File

@@ -19,18 +19,38 @@ pub async fn connect<T>(
where
T: MakeTlsConnect<TcpStream>,
{
let hostname = match &config.host {
Host::Tcp(host) => host.as_str(),
};
let tls = tls
.make_tls_connect(hostname)
.map_err(|e| Error::tls(e.into()))?;
match connect_once(&config.host, config.port, tls, config).await {
Ok((client, connection)) => Ok((client, connection)),
Err(e) => Err(e),
if config.host.is_empty() {
return Err(Error::config("host missing".into()));
}
if config.port.len() > 1 && config.port.len() != config.host.len() {
return Err(Error::config("invalid number of ports".into()));
}
let mut error = None;
for (i, host) in config.host.iter().enumerate() {
let port = config
.port
.get(i)
.or_else(|| config.port.first())
.copied()
.unwrap_or(5432);
let hostname = match host {
Host::Tcp(host) => host.as_str(),
};
let tls = tls
.make_tls_connect(hostname)
.map_err(|e| Error::tls(e.into()))?;
match connect_once(host, port, tls, config).await {
Ok((client, connection)) => return Ok((client, connection)),
Err(e) => error = Some(e),
}
}
Err(error.unwrap())
}
async fn connect_once<T>(

View File

@@ -7,6 +7,7 @@ use crate::Error;
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
use futures_util::{ready, Sink, SinkExt, Stream, TryStreamExt};
use postgres_protocol2::authentication;
use postgres_protocol2::authentication::sasl;
use postgres_protocol2::authentication::sasl::ScramSha256;
use postgres_protocol2::message::backend::{AuthenticationSaslBody, Message, NoticeResponseBody};
@@ -173,11 +174,25 @@ where
authenticate_password(stream, pass).await?;
}
Some(Message::AuthenticationMd5Password(body)) => {
can_skip_channel_binding(config)?;
let user = config
.user
.as_ref()
.ok_or_else(|| Error::config("user missing".into()))?;
let pass = config
.password
.as_ref()
.ok_or_else(|| Error::config("password missing".into()))?;
let output = authentication::md5_hash(user.as_bytes(), pass, body.salt());
authenticate_password(stream, output.as_bytes()).await?;
}
Some(Message::AuthenticationSasl(body)) => {
authenticate_sasl(stream, body, config).await?;
}
Some(Message::AuthenticationMd5Password)
| Some(Message::AuthenticationKerberosV5)
Some(Message::AuthenticationKerberosV5)
| Some(Message::AuthenticationScmCredential)
| Some(Message::AuthenticationGss)
| Some(Message::AuthenticationSspi) => {

View File

@@ -349,6 +349,7 @@ enum Kind {
Parse,
Encode,
Authentication,
ConfigParse,
Config,
Connect,
Timeout,
@@ -385,6 +386,7 @@ impl fmt::Display for Error {
Kind::Parse => fmt.write_str("error parsing response from server")?,
Kind::Encode => fmt.write_str("error encoding message to server")?,
Kind::Authentication => fmt.write_str("authentication error")?,
Kind::ConfigParse => fmt.write_str("invalid connection string")?,
Kind::Config => fmt.write_str("invalid configuration")?,
Kind::Connect => fmt.write_str("error connecting to server")?,
Kind::Timeout => fmt.write_str("timeout waiting for server")?,
@@ -480,6 +482,10 @@ impl Error {
Error::new(Kind::Authentication, Some(e))
}
pub(crate) fn config_parse(e: Box<dyn error::Error + Sync + Send>) -> Error {
Error::new(Kind::ConfigParse, Some(e))
}
pub(crate) fn config(e: Box<dyn error::Error + Sync + Send>) -> Error {
Error::new(Kind::Config, Some(e))
}

View File

@@ -13,12 +13,14 @@ pub use crate::query::RowStream;
pub use crate::row::{Row, SimpleQueryRow};
pub use crate::simple_query::SimpleQueryStream;
pub use crate::statement::{Column, Statement};
use crate::tls::MakeTlsConnect;
pub use crate::tls::NoTls;
pub use crate::to_statement::ToStatement;
pub use crate::transaction::Transaction;
pub use crate::transaction_builder::{IsolationLevel, TransactionBuilder};
use crate::types::ToSql;
use postgres_protocol2::message::backend::ReadyForQueryBody;
use tokio::net::TcpStream;
/// After executing a query, the connection will be in one of these states
#[derive(Clone, Copy, Debug, PartialEq)]
@@ -70,6 +72,24 @@ mod transaction;
mod transaction_builder;
pub mod types;
/// A convenience function which parses a connection string and connects to the database.
///
/// See the documentation for [`Config`] for details on the connection string format.
///
/// Requires the `runtime` Cargo feature (enabled by default).
///
/// [`Config`]: config/struct.Config.html
pub async fn connect<T>(
config: &str,
tls: T,
) -> Result<(Client, Connection<TcpStream, T::Stream>), Error>
where
T: MakeTlsConnect<TcpStream>,
{
let config = config.parse::<Config>()?;
config.connect(tls).await
}
/// An asynchronous notification.
#[derive(Clone, Debug)]
pub struct Notification {

View File

@@ -112,38 +112,30 @@ impl MetadataRecord {
};
// Next, filter the metadata record by shard.
match metadata_record {
Some(
MetadataRecord::Heapam(HeapamRecord::ClearVmBits(ref mut clear_vm_bits))
| MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(ref mut clear_vm_bits)),
) => {
// Route VM page updates to the shards that own them. VM pages are stored in the VM fork
// of the main relation. These are sharded and managed just like regular relation pages.
// See: https://github.com/neondatabase/neon/issues/9855
let is_local_vm_page = |heap_blk| {
let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blk);
shard.is_key_local(&rel_block_to_key(clear_vm_bits.vm_rel, vm_blk))
};
// Send the old and new VM page updates to their respective shards.
clear_vm_bits.old_heap_blkno = clear_vm_bits
.old_heap_blkno
.filter(|&blkno| is_local_vm_page(blkno));
clear_vm_bits.new_heap_blkno = clear_vm_bits
.new_heap_blkno
.filter(|&blkno| is_local_vm_page(blkno));
// If neither VM page belongs to this shard, discard the record.
if clear_vm_bits.old_heap_blkno.is_none() && clear_vm_bits.new_heap_blkno.is_none()
{
metadata_record = None
}
// Route VM page updates to the shards that own them. VM pages are stored in the VM fork
// of the main relation. These are sharded and managed just like regular relation pages.
// See: https://github.com/neondatabase/neon/issues/9855
if let Some(
MetadataRecord::Heapam(HeapamRecord::ClearVmBits(ref mut clear_vm_bits))
| MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(ref mut clear_vm_bits)),
) = metadata_record
{
let is_local_vm_page = |heap_blk| {
let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blk);
shard.is_key_local(&rel_block_to_key(clear_vm_bits.vm_rel, vm_blk))
};
// Send the old and new VM page updates to their respective shards.
clear_vm_bits.old_heap_blkno = clear_vm_bits
.old_heap_blkno
.filter(|&blkno| is_local_vm_page(blkno));
clear_vm_bits.new_heap_blkno = clear_vm_bits
.new_heap_blkno
.filter(|&blkno| is_local_vm_page(blkno));
// If neither VM page belongs to this shard, discard the record.
if clear_vm_bits.old_heap_blkno.is_none() && clear_vm_bits.new_heap_blkno.is_none() {
metadata_record = None
}
Some(MetadataRecord::LogicalMessage(LogicalMessageRecord::Put(_))) => {
// Filter LogicalMessage records (AUX files) to only be stored on shard zero
if !shard.is_shard_zero() {
metadata_record = None;
}
}
_ => {}
}
Ok(metadata_record)

View File

@@ -345,7 +345,6 @@ impl AuxFileV2 {
AuxFileV2::Recognized("pg_logical/replorigin_checkpoint", hash)
}
(2, 1) => AuxFileV2::Recognized("pg_replslot/", hash),
(3, 1) => AuxFileV2::Recognized("pg_stat/pgstat.stat", hash),
(1, 0xff) => AuxFileV2::OtherWithPrefix("pg_logical/", hash),
(0xff, 0xff) => AuxFileV2::Other(hash),
_ => return None,

View File

@@ -39,7 +39,6 @@ fn aux_hash_to_metadata_key(dir_level1: u8, dir_level2: u8, data: &[u8]) -> Key
const AUX_DIR_PG_LOGICAL: u8 = 0x01;
const AUX_DIR_PG_REPLSLOT: u8 = 0x02;
const AUX_DIR_PG_STAT: u8 = 0x03;
const AUX_DIR_PG_UNKNOWN: u8 = 0xFF;
/// Encode the aux file into a fixed-size key.
@@ -54,7 +53,6 @@ const AUX_DIR_PG_UNKNOWN: u8 = 0xFF;
/// * pg_logical/replorigin_checkpoint -> 0x0103
/// * pg_logical/others -> 0x01FF
/// * pg_replslot/ -> 0x0201
/// * pg_stat/pgstat.stat -> 0x0301
/// * others -> 0xFFFF
///
/// If you add new AUX files to this function, please also add a test case to `test_encoding_portable`.
@@ -77,8 +75,6 @@ pub fn encode_aux_file_key(path: &str) -> Key {
aux_hash_to_metadata_key(AUX_DIR_PG_LOGICAL, 0xFF, fname.as_bytes())
} else if let Some(fname) = path.strip_prefix("pg_replslot/") {
aux_hash_to_metadata_key(AUX_DIR_PG_REPLSLOT, 0x01, fname.as_bytes())
} else if let Some(fname) = path.strip_prefix("pg_stat/pgstat.stat") {
aux_hash_to_metadata_key(AUX_DIR_PG_STAT, 0x01, fname.as_bytes())
} else {
if cfg!(debug_assertions) {
warn!(

View File

@@ -255,31 +255,6 @@ where
async fn send_tarball(mut self) -> Result<(), BasebackupError> {
// TODO include checksum
// Construct the pg_control file from the persisted checkpoint and pg_control
// information. But we only add this to the tarball at the end, so that if the
// writing is interrupted half-way through, the resulting incomplete tarball will
// be missing the pg_control file, which prevents PostgreSQL from starting up on
// it. With proper error handling, you should never try to start up from an
// incomplete basebackup in the first place, of course, but this is a nice little
// extra safety measure.
let checkpoint_bytes = self
.timeline
.get_checkpoint(self.lsn, self.ctx)
.await
.context("failed to get checkpoint bytes")?;
let pg_control_bytes = self
.timeline
.get_control_file(self.lsn, self.ctx)
.await
.context("failed get control bytes")?;
let (pg_control_bytes, system_identifier, was_shutdown) =
postgres_ffi::generate_pg_control(
&pg_control_bytes,
&checkpoint_bytes,
self.lsn,
self.timeline.pg_version,
)?;
let lazy_slru_download = self.timeline.get_lazy_slru_download() && !self.full_backup;
let pgversion = self.timeline.pg_version;
@@ -417,10 +392,6 @@ where
// In future we will not generate AUX record for "pg_logical/replorigin_checkpoint" at all,
// but now we should handle (skip) it for backward compatibility.
continue;
} else if path == "pg_stat/pgstat.stat" && !was_shutdown {
// Drop statistic in case of abnormal termination, i.e. if we're not starting from the exact LSN
// of a shutdown checkpoint.
continue;
}
let header = new_tar_header(&path, content.len() as u64)?;
self.ar
@@ -482,9 +453,8 @@ where
)))
});
// Last, add the pg_control file and bootstrap WAL segment.
self.add_pgcontrol_file(pg_control_bytes, system_identifier)
.await?;
// Generate pg_control and bootstrap WAL segment.
self.add_pgcontrol_file().await?;
self.ar.finish().await.map_err(BasebackupError::Client)?;
debug!("all tarred up!");
Ok(())
@@ -687,11 +657,7 @@ where
// Add generated pg_control file and bootstrap WAL segment.
// Also send zenith.signal file with extra bootstrap data.
//
async fn add_pgcontrol_file(
&mut self,
pg_control_bytes: Bytes,
system_identifier: u64,
) -> Result<(), BasebackupError> {
async fn add_pgcontrol_file(&mut self) -> Result<(), BasebackupError> {
// add zenith.signal file
let mut zenith_signal = String::new();
if self.prev_record_lsn == Lsn(0) {
@@ -714,6 +680,24 @@ where
.await
.map_err(BasebackupError::Client)?;
let checkpoint_bytes = self
.timeline
.get_checkpoint(self.lsn, self.ctx)
.await
.context("failed to get checkpoint bytes")?;
let pg_control_bytes = self
.timeline
.get_control_file(self.lsn, self.ctx)
.await
.context("failed get control bytes")?;
let (pg_control_bytes, system_identifier) = postgres_ffi::generate_pg_control(
&pg_control_bytes,
&checkpoint_bytes,
self.lsn,
self.timeline.pg_version,
)?;
//send pg_control
let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;
self.ar

View File

@@ -636,59 +636,45 @@ fn start_pageserver(
tokio::net::TcpListener::from_std(pageserver_listener).context("create tokio listener")?
});
// All started up! Now just sit and wait for shutdown signal.
BACKGROUND_RUNTIME.block_on(async move {
let signal_token = CancellationToken::new();
let signal_cancel = signal_token.child_token();
let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard());
// Spawn signal handlers. Runs in a loop since we want to be responsive to multiple signals
// even after triggering shutdown (e.g. a SIGQUIT after a slow SIGTERM shutdown). See:
// https://github.com/neondatabase/neon/issues/9740.
tokio::spawn(async move {
// All started up! Now just sit and wait for shutdown signal.
{
BACKGROUND_RUNTIME.block_on(async move {
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt()).unwrap();
let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate()).unwrap();
let mut sigquit = tokio::signal::unix::signal(SignalKind::quit()).unwrap();
loop {
let signal = tokio::select! {
_ = sigquit.recv() => {
info!("Got signal SIGQUIT. Terminating in immediate shutdown mode.");
std::process::exit(111);
}
_ = sigint.recv() => "SIGINT",
_ = sigterm.recv() => "SIGTERM",
};
if !signal_token.is_cancelled() {
info!("Got signal {signal}. Terminating gracefully in fast shutdown mode.");
signal_token.cancel();
} else {
info!("Got signal {signal}. Already shutting down.");
let signal = tokio::select! {
_ = sigquit.recv() => {
info!("Got signal SIGQUIT. Terminating in immediate shutdown mode",);
std::process::exit(111);
}
}
});
_ = sigint.recv() => { "SIGINT" },
_ = sigterm.recv() => { "SIGTERM" },
};
// Wait for cancellation signal and shut down the pageserver.
//
// This cancels the `shutdown_pageserver` cancellation tree. Right now that tree doesn't
// reach very far, and `task_mgr` is used instead. The plan is to change that over time.
signal_cancel.cancelled().await;
info!("Got signal {signal}. Terminating gracefully in fast shutdown mode",);
shutdown_pageserver.cancel();
pageserver::shutdown_pageserver(
http_endpoint_listener,
page_service,
consumption_metrics_tasks,
disk_usage_eviction_task,
&tenant_manager,
background_purges,
deletion_queue.clone(),
secondary_controller_tasks,
0,
)
.await;
unreachable!();
})
// This cancels the `shutdown_pageserver` cancellation tree.
// Right now that tree doesn't reach very far, and `task_mgr` is used instead.
// The plan is to change that over time.
shutdown_pageserver.take();
pageserver::shutdown_pageserver(
http_endpoint_listener,
page_service,
consumption_metrics_tasks,
disk_usage_eviction_task,
&tenant_manager,
background_purges,
deletion_queue.clone(),
secondary_controller_tasks,
0,
)
.await;
unreachable!()
})
}
}
async fn create_remote_storage_client(

View File

@@ -115,10 +115,6 @@ impl ControllerUpcallClient {
Ok(res)
}
pub(crate) fn base_url(&self) -> &Url {
&self.base_url
}
}
impl ControlPlaneGenerationsApi for ControllerUpcallClient {
@@ -195,15 +191,13 @@ impl ControlPlaneGenerationsApi for ControllerUpcallClient {
let request = ReAttachRequest {
node_id: self.node_id,
register: register.clone(),
register,
};
let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?;
tracing::info!(
"Received re-attach response with {} tenants (node {}, register: {:?})",
response.tenants.len(),
self.node_id,
register,
"Received re-attach response with {} tenants",
response.tenants.len()
);
failpoint_support::sleep_millis_async!("control-plane-client-re-attach");

View File

@@ -575,24 +575,18 @@ async fn import_file(
} else if file_path.starts_with("pg_xact") {
let slru = SlruKind::Clog;
if modification.tline.tenant_shard_id.is_shard_zero() {
import_slru(modification, slru, file_path, reader, len, ctx).await?;
debug!("imported clog slru");
}
import_slru(modification, slru, file_path, reader, len, ctx).await?;
debug!("imported clog slru");
} else if file_path.starts_with("pg_multixact/offsets") {
let slru = SlruKind::MultiXactOffsets;
if modification.tline.tenant_shard_id.is_shard_zero() {
import_slru(modification, slru, file_path, reader, len, ctx).await?;
debug!("imported multixact offsets slru");
}
import_slru(modification, slru, file_path, reader, len, ctx).await?;
debug!("imported multixact offsets slru");
} else if file_path.starts_with("pg_multixact/members") {
let slru = SlruKind::MultiXactMembers;
if modification.tline.tenant_shard_id.is_shard_zero() {
import_slru(modification, slru, file_path, reader, len, ctx).await?;
debug!("imported multixact members slru");
}
import_slru(modification, slru, file_path, reader, len, ctx).await?;
debug!("imported multixact members slru");
} else if file_path.starts_with("pg_twophase") {
let bytes = read_all_bytes(reader).await?;

View File

@@ -217,16 +217,31 @@ impl<'a> ScanLatencyOngoingRecording<'a> {
ScanLatencyOngoingRecording { parent, start }
}
pub(crate) fn observe(self) {
pub(crate) fn observe(self, throttled: Option<Duration>) {
let elapsed = self.start.elapsed();
self.parent.observe(elapsed.as_secs_f64());
let ex_throttled = if let Some(throttled) = throttled {
elapsed.checked_sub(throttled)
} else {
Some(elapsed)
};
if let Some(ex_throttled) = ex_throttled {
self.parent.observe(ex_throttled.as_secs_f64());
} else {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut rate_limit = LOGGED.lock().unwrap();
rate_limit.call(|| {
warn!("error deducting time spent throttled; this message is logged at a global rate limit");
});
}
}
}
pub(crate) static GET_VECTORED_LATENCY: Lazy<GetVectoredLatency> = Lazy::new(|| {
let inner = register_histogram_vec!(
"pageserver_get_vectored_seconds",
"Time spent in get_vectored.",
"Time spent in get_vectored, excluding time spent in timeline_get_throttle.",
&["task_kind"],
CRITICAL_OP_BUCKETS.into(),
)
@@ -249,7 +264,7 @@ pub(crate) static GET_VECTORED_LATENCY: Lazy<GetVectoredLatency> = Lazy::new(||
pub(crate) static SCAN_LATENCY: Lazy<ScanLatency> = Lazy::new(|| {
let inner = register_histogram_vec!(
"pageserver_scan_seconds",
"Time spent in scan.",
"Time spent in scan, excluding time spent in timeline_get_throttle.",
&["task_kind"],
CRITICAL_OP_BUCKETS.into(),
)
@@ -1212,44 +1227,11 @@ pub(crate) struct SmgrOpTimer {
per_timeline_latency_histo: Option<Histogram>,
start: Instant,
throttled: Duration,
op: SmgrQueryType,
}
impl SmgrOpTimer {
pub(crate) fn deduct_throttle(&mut self, throttle: &Option<Duration>) {
let Some(throttle) = throttle else {
return;
};
self.throttled += *throttle;
}
}
impl Drop for SmgrOpTimer {
fn drop(&mut self) {
let elapsed = self.start.elapsed();
let elapsed = match elapsed.checked_sub(self.throttled) {
Some(elapsed) => elapsed,
None => {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<enum_map::EnumMap<SmgrQueryType, RateLimit>>> =
Lazy::new(|| {
Mutex::new(enum_map::EnumMap::from_array(std::array::from_fn(|_| {
RateLimit::new(Duration::from_secs(10))
})))
});
let mut guard = LOGGED.lock().unwrap();
let rate_limit = &mut guard[self.op];
rate_limit.call(|| {
warn!(op=?self.op, ?elapsed, ?self.throttled, "implementation error: time spent throttled exceeds total request wall clock time");
});
elapsed // un-throttled time, more info than just saturating to 0
}
};
let elapsed = elapsed.as_secs_f64();
let elapsed = self.start.elapsed().as_secs_f64();
self.global_latency_histo.observe(elapsed);
if let Some(per_timeline_getpage_histo) = &self.per_timeline_latency_histo {
per_timeline_getpage_histo.observe(elapsed);
@@ -1509,8 +1491,6 @@ impl SmgrQueryTimePerTimeline {
global_latency_histo: self.global_latency[op as usize].clone(),
per_timeline_latency_histo,
start: started_at,
op,
throttled: Duration::ZERO,
}
}
@@ -3319,7 +3299,7 @@ pub(crate) mod tenant_throttling {
use once_cell::sync::Lazy;
use utils::shard::TenantShardId;
use crate::tenant::{self};
use crate::tenant::{self, throttle::Metric};
struct GlobalAndPerTenantIntCounter {
global: IntCounter,
@@ -3338,7 +3318,7 @@ pub(crate) mod tenant_throttling {
}
}
pub(crate) struct Metrics<const KIND: usize> {
pub(crate) struct TimelineGet {
count_accounted_start: GlobalAndPerTenantIntCounter,
count_accounted_finish: GlobalAndPerTenantIntCounter,
wait_time: GlobalAndPerTenantIntCounter,
@@ -3411,41 +3391,40 @@ pub(crate) mod tenant_throttling {
.unwrap()
});
const KINDS: &[&str] = &["pagestream"];
pub type Pagestream = Metrics<0>;
const KIND: &str = "timeline_get";
impl<const KIND: usize> Metrics<KIND> {
impl TimelineGet {
pub(crate) fn new(tenant_shard_id: &TenantShardId) -> Self {
let per_tenant_label_values = &[
KINDS[KIND],
KIND,
&tenant_shard_id.tenant_id.to_string(),
&tenant_shard_id.shard_slug().to_string(),
];
Metrics {
TimelineGet {
count_accounted_start: {
GlobalAndPerTenantIntCounter {
global: COUNT_ACCOUNTED_START.with_label_values(&[KINDS[KIND]]),
global: COUNT_ACCOUNTED_START.with_label_values(&[KIND]),
per_tenant: COUNT_ACCOUNTED_START_PER_TENANT
.with_label_values(per_tenant_label_values),
}
},
count_accounted_finish: {
GlobalAndPerTenantIntCounter {
global: COUNT_ACCOUNTED_FINISH.with_label_values(&[KINDS[KIND]]),
global: COUNT_ACCOUNTED_FINISH.with_label_values(&[KIND]),
per_tenant: COUNT_ACCOUNTED_FINISH_PER_TENANT
.with_label_values(per_tenant_label_values),
}
},
wait_time: {
GlobalAndPerTenantIntCounter {
global: WAIT_USECS.with_label_values(&[KINDS[KIND]]),
global: WAIT_USECS.with_label_values(&[KIND]),
per_tenant: WAIT_USECS_PER_TENANT
.with_label_values(per_tenant_label_values),
}
},
count_throttled: {
GlobalAndPerTenantIntCounter {
global: WAIT_COUNT.with_label_values(&[KINDS[KIND]]),
global: WAIT_COUNT.with_label_values(&[KIND]),
per_tenant: WAIT_COUNT_PER_TENANT
.with_label_values(per_tenant_label_values),
}
@@ -3468,17 +3447,15 @@ pub(crate) mod tenant_throttling {
&WAIT_USECS_PER_TENANT,
&WAIT_COUNT_PER_TENANT,
] {
for kind in KINDS {
let _ = m.remove_label_values(&[
kind,
&tenant_shard_id.tenant_id.to_string(),
&tenant_shard_id.shard_slug().to_string(),
]);
}
let _ = m.remove_label_values(&[
KIND,
&tenant_shard_id.tenant_id.to_string(),
&tenant_shard_id.shard_slug().to_string(),
]);
}
}
impl<const KIND: usize> tenant::throttle::Metric for Metrics<KIND> {
impl Metric for TimelineGet {
#[inline(always)]
fn accounting_start(&self) {
self.count_accounted_start.inc();

View File

@@ -574,41 +574,6 @@ enum BatchedFeMessage {
},
}
impl BatchedFeMessage {
async fn throttle(&mut self, cancel: &CancellationToken) -> Result<(), QueryError> {
let (shard, tokens, timers) = match self {
BatchedFeMessage::Exists { shard, timer, .. }
| BatchedFeMessage::Nblocks { shard, timer, .. }
| BatchedFeMessage::DbSize { shard, timer, .. }
| BatchedFeMessage::GetSlruSegment { shard, timer, .. } => {
(
shard,
// 1 token is probably under-estimating because these
// request handlers typically do several Timeline::get calls.
1,
itertools::Either::Left(std::iter::once(timer)),
)
}
BatchedFeMessage::GetPage { shard, pages, .. } => (
shard,
pages.len(),
itertools::Either::Right(pages.iter_mut().map(|(_, _, timer)| timer)),
),
BatchedFeMessage::RespondError { .. } => return Ok(()),
};
let throttled = tokio::select! {
throttled = shard.pagestream_throttle.throttle(tokens) => { throttled }
_ = cancel.cancelled() => {
return Err(QueryError::Shutdown);
}
};
for timer in timers {
timer.deduct_throttle(&throttled);
}
Ok(())
}
}
impl PageServerHandler {
pub fn new(
tenant_manager: Arc<TenantManager>,
@@ -1192,18 +1157,13 @@ impl PageServerHandler {
Ok(msg) => msg,
Err(e) => break e,
};
let mut msg = match msg {
let msg = match msg {
Some(msg) => msg,
None => {
debug!("pagestream subprotocol end observed");
return ((pgb_reader, timeline_handles), Ok(()));
}
};
if let Err(cancelled) = msg.throttle(&self.cancel).await {
break cancelled;
}
let err = self
.pagesteam_handle_batched_message(pgb_writer, msg, &cancel, ctx)
.await;
@@ -1361,13 +1321,12 @@ impl PageServerHandler {
return Ok(());
}
};
let mut batch = match batch {
let batch = match batch {
Ok(batch) => batch,
Err(e) => {
return Err(e);
}
};
batch.throttle(&self.cancel).await?;
self.pagesteam_handle_batched_message(pgb_writer, batch, &cancel, &ctx)
.await?;
}

View File

@@ -530,7 +530,6 @@ impl Timeline {
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
assert!(self.tenant_shard_id.is_shard_zero());
let n_blocks = self
.get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
.await?;
@@ -553,7 +552,6 @@ impl Timeline {
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
assert!(self.tenant_shard_id.is_shard_zero());
let key = slru_block_to_key(kind, segno, blknum);
self.get(key, lsn, ctx).await
}
@@ -566,7 +564,6 @@ impl Timeline {
version: Version<'_>,
ctx: &RequestContext,
) -> Result<BlockNumber, PageReconstructError> {
assert!(self.tenant_shard_id.is_shard_zero());
let key = slru_segment_size_to_key(kind, segno);
let mut buf = version.get(self, key, ctx).await?;
Ok(buf.get_u32_le())
@@ -580,7 +577,6 @@ impl Timeline {
version: Version<'_>,
ctx: &RequestContext,
) -> Result<bool, PageReconstructError> {
assert!(self.tenant_shard_id.is_shard_zero());
// fetch directory listing
let key = slru_dir_to_key(kind);
let buf = version.get(self, key, ctx).await?;
@@ -1051,28 +1047,26 @@ impl Timeline {
}
// Iterate SLRUs next
if self.tenant_shard_id.is_shard_zero() {
for kind in [
SlruKind::Clog,
SlruKind::MultiXactMembers,
SlruKind::MultiXactOffsets,
] {
let slrudir_key = slru_dir_to_key(kind);
result.add_key(slrudir_key);
let buf = self.get(slrudir_key, lsn, ctx).await?;
let dir = SlruSegmentDirectory::des(&buf)?;
let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
segments.sort_unstable();
for segno in segments {
let segsize_key = slru_segment_size_to_key(kind, segno);
let mut buf = self.get(segsize_key, lsn, ctx).await?;
let segsize = buf.get_u32_le();
for kind in [
SlruKind::Clog,
SlruKind::MultiXactMembers,
SlruKind::MultiXactOffsets,
] {
let slrudir_key = slru_dir_to_key(kind);
result.add_key(slrudir_key);
let buf = self.get(slrudir_key, lsn, ctx).await?;
let dir = SlruSegmentDirectory::des(&buf)?;
let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
segments.sort_unstable();
for segno in segments {
let segsize_key = slru_segment_size_to_key(kind, segno);
let mut buf = self.get(segsize_key, lsn, ctx).await?;
let segsize = buf.get_u32_le();
result.add_range(
slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
);
result.add_key(segsize_key);
}
result.add_range(
slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
);
result.add_key(segsize_key);
}
}
@@ -1474,10 +1468,6 @@ impl<'a> DatadirModification<'a> {
blknum: BlockNumber,
rec: NeonWalRecord,
) -> anyhow::Result<()> {
if !self.tline.tenant_shard_id.is_shard_zero() {
return Ok(());
}
self.put(
slru_block_to_key(kind, segno, blknum),
Value::WalRecord(rec),
@@ -1511,8 +1501,6 @@ impl<'a> DatadirModification<'a> {
blknum: BlockNumber,
img: Bytes,
) -> anyhow::Result<()> {
assert!(self.tline.tenant_shard_id.is_shard_zero());
let key = slru_block_to_key(kind, segno, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
@@ -1554,7 +1542,6 @@ impl<'a> DatadirModification<'a> {
segno: u32,
blknum: BlockNumber,
) -> anyhow::Result<()> {
assert!(self.tline.tenant_shard_id.is_shard_zero());
let key = slru_block_to_key(kind, segno, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
@@ -1866,8 +1853,6 @@ impl<'a> DatadirModification<'a> {
nblocks: BlockNumber,
ctx: &RequestContext,
) -> anyhow::Result<()> {
assert!(self.tline.tenant_shard_id.is_shard_zero());
// Add it to the directory entry
let dir_key = slru_dir_to_key(kind);
let buf = self.get(dir_key, ctx).await?;
@@ -1900,8 +1885,6 @@ impl<'a> DatadirModification<'a> {
segno: u32,
nblocks: BlockNumber,
) -> anyhow::Result<()> {
assert!(self.tline.tenant_shard_id.is_shard_zero());
// Put size
let size_key = slru_segment_size_to_key(kind, segno);
let buf = nblocks.to_le_bytes();

View File

@@ -357,8 +357,8 @@ pub struct Tenant {
/// Throttle applied at the top of [`Timeline::get`].
/// All [`Tenant::timelines`] of a given [`Tenant`] instance share the same [`throttle::Throttle`] instance.
pub(crate) pagestream_throttle:
Arc<throttle::Throttle<crate::metrics::tenant_throttling::Pagestream>>,
pub(crate) timeline_get_throttle:
Arc<throttle::Throttle<crate::metrics::tenant_throttling::TimelineGet>>,
/// An ongoing timeline detach concurrency limiter.
///
@@ -1678,7 +1678,7 @@ impl Tenant {
remote_metadata,
TimelineResources {
remote_client,
pagestream_throttle: self.pagestream_throttle.clone(),
timeline_get_throttle: self.timeline_get_throttle.clone(),
l0_flush_global_state: self.l0_flush_global_state.clone(),
},
LoadTimelineCause::Attach,
@@ -3835,7 +3835,7 @@ impl Tenant {
}
}
fn get_pagestream_throttle_config(
fn get_timeline_get_throttle_config(
psconf: &'static PageServerConf,
overrides: &TenantConfOpt,
) -> throttle::Config {
@@ -3846,8 +3846,8 @@ impl Tenant {
}
pub(crate) fn tenant_conf_updated(&self, new_conf: &TenantConfOpt) {
let conf = Self::get_pagestream_throttle_config(self.conf, new_conf);
self.pagestream_throttle.reconfigure(conf)
let conf = Self::get_timeline_get_throttle_config(self.conf, new_conf);
self.timeline_get_throttle.reconfigure(conf)
}
/// Helper function to create a new Timeline struct.
@@ -4009,9 +4009,9 @@ impl Tenant {
attach_wal_lag_cooldown: Arc::new(std::sync::OnceLock::new()),
cancel: CancellationToken::default(),
gate: Gate::default(),
pagestream_throttle: Arc::new(throttle::Throttle::new(
Tenant::get_pagestream_throttle_config(conf, &attached_conf.tenant_conf),
crate::metrics::tenant_throttling::Metrics::new(&tenant_shard_id),
timeline_get_throttle: Arc::new(throttle::Throttle::new(
Tenant::get_timeline_get_throttle_config(conf, &attached_conf.tenant_conf),
crate::metrics::tenant_throttling::TimelineGet::new(&tenant_shard_id),
)),
tenant_conf: Arc::new(ArcSwap::from_pointee(attached_conf)),
ongoing_timeline_detach: std::sync::Mutex::default(),
@@ -4909,7 +4909,7 @@ impl Tenant {
fn build_timeline_resources(&self, timeline_id: TimelineId) -> TimelineResources {
TimelineResources {
remote_client: self.build_timeline_remote_client(timeline_id),
pagestream_throttle: self.pagestream_throttle.clone(),
timeline_get_throttle: self.timeline_get_throttle.clone(),
l0_flush_global_state: self.l0_flush_global_state.clone(),
}
}

View File

@@ -347,7 +347,7 @@ async fn init_load_generations(
);
emergency_generations(tenant_confs)
} else if let Some(client) = ControllerUpcallClient::new(conf, cancel) {
info!("Calling {} API to re-attach tenants", client.base_url());
info!("Calling control plane API to re-attach tenants");
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
match client.re_attach(conf).await {
Ok(tenants) => tenants

View File

@@ -2564,9 +2564,9 @@ pub fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
}
/// Given the key of a tenant manifest, parse out the generation number
pub fn parse_remote_tenant_manifest_path(path: RemotePath) -> Option<Generation> {
pub(crate) fn parse_remote_tenant_manifest_path(path: RemotePath) -> Option<Generation> {
static RE: OnceLock<Regex> = OnceLock::new();
let re = RE.get_or_init(|| Regex::new(r".*tenant-manifest-([0-9a-f]{8}).json").unwrap());
let re = RE.get_or_init(|| Regex::new(r".+tenant-manifest-([0-9a-f]{8}).json").unwrap());
re.captures(path.get_path().as_str())
.and_then(|c| c.get(1))
.and_then(|m| Generation::parse_suffix(m.as_str()))

View File

@@ -43,7 +43,7 @@ impl TenantManifest {
offloaded_timelines: vec![],
}
}
pub fn from_json_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
pub(crate) fn from_json_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
serde_json::from_slice::<Self>(bytes)
}

View File

@@ -52,8 +52,8 @@ use camino::{Utf8Path, Utf8PathBuf};
use futures::StreamExt;
use itertools::Itertools;
use pageserver_api::config::MaxVectoredReadBytes;
use pageserver_api::key::DBDIR_KEY;
use pageserver_api::key::{Key, KEY_SIZE};
use pageserver_api::key::{AUX_FILES_KEY, DBDIR_KEY};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId;
@@ -969,11 +969,7 @@ impl DeltaLayerInner {
.as_slice()
.iter()
.filter_map(|(_, blob_meta)| {
if blob_meta.key.is_rel_dir_key()
|| blob_meta.key == DBDIR_KEY
|| blob_meta.key == AUX_FILES_KEY
|| blob_meta.key.is_aux_file_key()
{
if blob_meta.key.is_rel_dir_key() || blob_meta.key == DBDIR_KEY {
// The size of values for these keys is unbounded and can
// grow very large in pathological cases.
None

View File

@@ -49,8 +49,8 @@ use camino::{Utf8Path, Utf8PathBuf};
use hex;
use itertools::Itertools;
use pageserver_api::config::MaxVectoredReadBytes;
use pageserver_api::key::DBDIR_KEY;
use pageserver_api::key::{Key, KEY_SIZE};
use pageserver_api::key::{AUX_FILES_KEY, DBDIR_KEY};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use pageserver_api::value::Value;
@@ -591,11 +591,7 @@ impl ImageLayerInner {
.as_slice()
.iter()
.filter_map(|(_, blob_meta)| {
if blob_meta.key.is_rel_dir_key()
|| blob_meta.key == DBDIR_KEY
|| blob_meta.key == AUX_FILES_KEY
|| blob_meta.key.is_aux_file_key()
{
if blob_meta.key.is_rel_dir_key() || blob_meta.key == DBDIR_KEY {
// The size of values for these keys is unbounded and can
// grow very large in pathological cases.
None

View File

@@ -471,14 +471,14 @@ async fn ingest_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken
// TODO: rename the background loop kind to something more generic, like, tenant housekeeping.
// Or just spawn another background loop for this throttle, it's not like it's super costly.
info_span!(parent: None, "pagestream_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
info_span!(parent: None, "timeline_get_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
let now = Instant::now();
let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
let Stats { count_accounted_start, count_accounted_finish, count_throttled, sum_throttled_usecs} = tenant.pagestream_throttle.reset_stats();
let Stats { count_accounted_start, count_accounted_finish, count_throttled, sum_throttled_usecs} = tenant.timeline_get_throttle.reset_stats();
if count_throttled == 0 {
return;
}
let allowed_rps = tenant.pagestream_throttle.steady_rps();
let allowed_rps = tenant.timeline_get_throttle.steady_rps();
let delta = now - prev;
info!(
n_seconds=%format_args!("{:.3}", delta.as_secs_f64()),

View File

@@ -1,4 +1,5 @@
use std::{
str::FromStr,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
@@ -7,8 +8,12 @@ use std::{
};
use arc_swap::ArcSwap;
use enumset::EnumSet;
use tracing::error;
use utils::leaky_bucket::{LeakyBucketConfig, RateLimiter};
use crate::{context::RequestContext, task_mgr::TaskKind};
/// Throttle for `async` functions.
///
/// Runtime reconfigurable.
@@ -30,7 +35,7 @@ pub struct Throttle<M: Metric> {
}
pub struct Inner {
enabled: bool,
task_kinds: EnumSet<TaskKind>,
rate_limiter: Arc<RateLimiter>,
}
@@ -74,12 +79,26 @@ where
}
fn new_inner(config: Config) -> Inner {
let Config {
enabled,
task_kinds,
initial,
refill_interval,
refill_amount,
max,
} = config;
let task_kinds: EnumSet<TaskKind> = task_kinds
.iter()
.filter_map(|s| match TaskKind::from_str(s) {
Ok(v) => Some(v),
Err(e) => {
// TODO: avoid this failure mode
error!(
"cannot parse task kind, ignoring for rate limiting {}",
utils::error::report_compact_sources(&e)
);
None
}
})
.collect();
// steady rate, we expect `refill_amount` requests per `refill_interval`.
// dividing gives us the rps.
@@ -93,7 +112,7 @@ where
let rate_limiter = RateLimiter::with_initial_tokens(config, f64::from(initial_tokens));
Inner {
enabled: enabled.is_enabled(),
task_kinds,
rate_limiter: Arc::new(rate_limiter),
}
}
@@ -122,13 +141,11 @@ where
self.inner.load().rate_limiter.steady_rps()
}
pub async fn throttle(&self, key_count: usize) -> Option<Duration> {
pub async fn throttle(&self, ctx: &RequestContext, key_count: usize) -> Option<Duration> {
let inner = self.inner.load_full(); // clones the `Inner` Arc
if !inner.enabled {
if !inner.task_kinds.contains(ctx.task_kind()) {
return None;
}
};
let start = std::time::Instant::now();
self.metric.accounting_start();

View File

@@ -208,8 +208,8 @@ fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
/// The outward-facing resources required to build a Timeline
pub struct TimelineResources {
pub remote_client: RemoteTimelineClient,
pub pagestream_throttle:
Arc<crate::tenant::throttle::Throttle<crate::metrics::tenant_throttling::Pagestream>>,
pub timeline_get_throttle:
Arc<crate::tenant::throttle::Throttle<crate::metrics::tenant_throttling::TimelineGet>>,
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
}
@@ -411,9 +411,9 @@ pub struct Timeline {
/// Timeline deletion will acquire both compaction and gc locks in whatever order.
gc_lock: tokio::sync::Mutex<()>,
/// Cloned from [`super::Tenant::pagestream_throttle`] on construction.
pub(crate) pagestream_throttle:
Arc<crate::tenant::throttle::Throttle<crate::metrics::tenant_throttling::Pagestream>>,
/// Cloned from [`super::Tenant::timeline_get_throttle`] on construction.
timeline_get_throttle:
Arc<crate::tenant::throttle::Throttle<crate::metrics::tenant_throttling::TimelineGet>>,
/// Size estimator for aux file v2
pub(crate) aux_file_size_estimator: AuxFileSizeEstimator,
@@ -949,7 +949,7 @@ impl Timeline {
/// If a remote layer file is needed, it is downloaded as part of this
/// call.
///
/// This method enforces [`Self::pagestream_throttle`] internally.
/// This method enforces [`Self::timeline_get_throttle`] internally.
///
/// NOTE: It is considered an error to 'get' a key that doesn't exist. The
/// abstraction above this needs to store suitable metadata to track what
@@ -977,6 +977,8 @@ impl Timeline {
// page_service.
debug_assert!(!self.shard_identity.is_key_disposable(&key));
self.timeline_get_throttle.throttle(ctx, 1).await;
let keyspace = KeySpace {
ranges: vec![key..key.next()],
};
@@ -1056,6 +1058,14 @@ impl Timeline {
.for_task_kind(ctx.task_kind())
.map(|metric| (metric, Instant::now()));
// start counting after throttle so that throttle time
// is always less than observation time and we don't
// underflow when computing `ex_throttled` below.
let throttled = self
.timeline_get_throttle
.throttle(ctx, key_count as usize)
.await;
let res = self
.get_vectored_impl(
keyspace.clone(),
@@ -1067,7 +1077,23 @@ impl Timeline {
if let Some((metric, start)) = start {
let elapsed = start.elapsed();
metric.observe(elapsed.as_secs_f64());
let ex_throttled = if let Some(throttled) = throttled {
elapsed.checked_sub(throttled)
} else {
Some(elapsed)
};
if let Some(ex_throttled) = ex_throttled {
metric.observe(ex_throttled.as_secs_f64());
} else {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut rate_limit = LOGGED.lock().unwrap();
rate_limit.call(|| {
warn!("error deducting time spent throttled; this message is logged at a global rate limit");
});
}
}
res
@@ -1112,6 +1138,16 @@ impl Timeline {
.for_task_kind(ctx.task_kind())
.map(ScanLatencyOngoingRecording::start_recording);
// start counting after throttle so that throttle time
// is always less than observation time and we don't
// underflow when computing the `ex_throttled` value in
// `recording.observe(throttled)` below.
let throttled = self
.timeline_get_throttle
// assume scan = 1 quota for now until we find a better way to process this
.throttle(ctx, 1)
.await;
let vectored_res = self
.get_vectored_impl(
keyspace.clone(),
@@ -1122,7 +1158,7 @@ impl Timeline {
.await;
if let Some(recording) = start {
recording.observe();
recording.observe(throttled);
}
vectored_res
@@ -2338,7 +2374,7 @@ impl Timeline {
standby_horizon: AtomicLsn::new(0),
pagestream_throttle: resources.pagestream_throttle,
timeline_get_throttle: resources.timeline_get_throttle,
aux_file_size_estimator: AuxFileSizeEstimator::new(aux_file_metrics),
@@ -4239,12 +4275,10 @@ impl Timeline {
// Normal path: we have written some data into the new image layer for this
// partition, so flush it to disk.
let (desc, path) = image_layer_writer.finish(ctx).await?;
let file_size = desc.file_size;
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
info!(
"created image layer for metadata {} size {}",
image_layer.local_path(),
file_size,
"created image layer for metadata {}",
image_layer.local_path()
);
Ok(ImageLayerCreationOutcome {
image: Some(image_layer),

View File

@@ -298,7 +298,7 @@ impl DeleteTimelineFlow {
None, // Ancestor is not needed for deletion.
TimelineResources {
remote_client,
pagestream_throttle: tenant.pagestream_throttle.clone(),
timeline_get_throttle: tenant.timeline_get_throttle.clone(),
l0_flush_global_state: tenant.l0_flush_global_state.clone(),
},
// Important. We dont pass ancestor above because it can be missing.

View File

@@ -129,23 +129,22 @@ impl Flow {
}
// Import SLRUs
if self.timeline.tenant_shard_id.is_shard_zero() {
// pg_xact (01:00 keyspace)
self.import_slru(SlruKind::Clog, &self.storage.pgdata().join("pg_xact"))
.await?;
// pg_multixact/members (01:01 keyspace)
self.import_slru(
SlruKind::MultiXactMembers,
&self.storage.pgdata().join("pg_multixact/members"),
)
// pg_xact (01:00 keyspace)
self.import_slru(SlruKind::Clog, &self.storage.pgdata().join("pg_xact"))
.await?;
// pg_multixact/offsets (01:02 keyspace)
self.import_slru(
SlruKind::MultiXactOffsets,
&self.storage.pgdata().join("pg_multixact/offsets"),
)
.await?;
}
// pg_multixact/members (01:01 keyspace)
self.import_slru(
SlruKind::MultiXactMembers,
&self.storage.pgdata().join("pg_multixact/members"),
)
.await?;
// pg_multixact/offsets (01:02 keyspace)
self.import_slru(
SlruKind::MultiXactOffsets,
&self.storage.pgdata().join("pg_multixact/offsets"),
)
.await?;
// Import pg_twophase.
// TODO: as empty
@@ -303,8 +302,6 @@ impl Flow {
}
async fn import_slru(&mut self, kind: SlruKind, path: &RemotePath) -> anyhow::Result<()> {
assert!(self.timeline.tenant_shard_id.is_shard_zero());
let segments = self.storage.listfilesindir(path).await?;
let segments: Vec<(String, u32, usize)> = segments
.into_iter()
@@ -340,6 +337,7 @@ impl Flow {
debug!(%p, segno=%segno, %size, %start_key, %end_key, "scheduling SLRU segment");
self.tasks
.push(AnyImportTask::SlruBlocks(ImportSlruBlocksTask::new(
*self.timeline.get_shard_identity(),
start_key..end_key,
&p,
self.storage.clone(),
@@ -633,14 +631,21 @@ impl ImportTask for ImportRelBlocksTask {
}
struct ImportSlruBlocksTask {
shard_identity: ShardIdentity,
key_range: Range<Key>,
path: RemotePath,
storage: RemoteStorageWrapper,
}
impl ImportSlruBlocksTask {
fn new(key_range: Range<Key>, path: &RemotePath, storage: RemoteStorageWrapper) -> Self {
fn new(
shard_identity: ShardIdentity,
key_range: Range<Key>,
path: &RemotePath,
storage: RemoteStorageWrapper,
) -> Self {
ImportSlruBlocksTask {
shard_identity,
key_range,
path: path.clone(),
storage,
@@ -668,13 +673,17 @@ impl ImportTask for ImportSlruBlocksTask {
let mut file_offset = 0;
while blknum < end_blk {
let key = slru_block_to_key(kind, segno, blknum);
assert!(
!self.shard_identity.is_key_disposable(&key),
"SLRU keys need to go into every shard"
);
let buf = &buf[file_offset..(file_offset + 8192)];
file_offset += 8192;
layer_writer
.put_image(key, Bytes::copy_from_slice(buf), ctx)
.await?;
nimages += 1;
blknum += 1;
nimages += 1;
}
Ok(nimages)
}

View File

@@ -1187,50 +1187,6 @@ impl WalIngest {
} else {
cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
}
// NB: We abuse the Checkpoint.redo field:
//
// - In PostgreSQL, the Checkpoint struct doesn't store the information
// of whether this is an online checkpoint or a shutdown checkpoint. It's
// stored in the XLOG info field of the WAL record, shutdown checkpoints
// use record type XLOG_CHECKPOINT_SHUTDOWN and online checkpoints use
// XLOG_CHECKPOINT_ONLINE. We don't store the original WAL record headers
// in the pageserver, however.
//
// - In PostgreSQL, the Checkpoint.redo field stores the *start* of the
// checkpoint record, if it's a shutdown checkpoint. But when we are
// starting from a shutdown checkpoint, the basebackup LSN is the *end*
// of the shutdown checkpoint WAL record. That makes it difficult to
// correctly detect whether we're starting from a shutdown record or
// not.
//
// To address both of those issues, we store 0 in the redo field if it's
// an online checkpoint record, and the record's *end* LSN if it's a
// shutdown checkpoint. We don't need the original redo pointer in neon,
// because we don't perform WAL replay at startup anyway, so we can get
// away with abusing the redo field like this.
//
// XXX: Ideally, we would persist the extra information in a more
// explicit format, rather than repurpose the fields of the Postgres
// struct like this. However, we already have persisted data like this,
// so we need to maintain backwards compatibility.
//
// NB: We didn't originally have this convention, so there are still old
// persisted records that didn't do this. Before, we didn't update the
// persisted redo field at all. That means that old records have a bogus
// redo pointer that points to some old value, from the checkpoint record
// that was originally imported from the data directory. If it was a
// project created in Neon, that means it points to the first checkpoint
// after initdb. That's OK for our purposes: all such old checkpoints are
// treated as old online checkpoints when the basebackup is created.
cp.redo = if info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN {
// Store the *end* LSN of the checkpoint record. Or to be precise,
// the start LSN of the *next* record, i.e. if the record ends
// exactly at page boundary, the redo LSN points to just after the
// page header on the next page.
lsn.into()
} else {
Lsn::INVALID.into()
};
// Write a new checkpoint key-value pair on every checkpoint record, even
// if nothing really changed. Not strictly required, but it seems nice to
@@ -1436,10 +1392,6 @@ impl WalIngest {
img: Bytes,
ctx: &RequestContext,
) -> Result<()> {
if !self.shard.is_shard_zero() {
return Ok(());
}
self.handle_slru_extend(modification, kind, segno, blknum, ctx)
.await?;
modification.put_slru_page_image(kind, segno, blknum, img)?;

View File

@@ -6,7 +6,7 @@ license.workspace = true
[features]
default = []
testing = ["dep:tokio-postgres"]
testing = []
[dependencies]
ahash.workspace = true
@@ -55,7 +55,6 @@ parquet.workspace = true
parquet_derive.workspace = true
pin-project-lite.workspace = true
postgres_backend.workspace = true
postgres-client = { package = "tokio-postgres2", path = "../libs/proxy/tokio-postgres2" }
postgres-protocol = { package = "postgres-protocol2", path = "../libs/proxy/postgres-protocol2" }
pq_proto.workspace = true
prometheus.workspace = true
@@ -82,7 +81,7 @@ subtle.workspace = true
thiserror.workspace = true
tikv-jemallocator.workspace = true
tikv-jemalloc-ctl = { workspace = true, features = ["use_std"] }
tokio-postgres = { workspace = true, optional = true }
tokio-postgres = { package = "tokio-postgres2", path = "../libs/proxy/tokio-postgres2" }
tokio-rustls.workspace = true
tokio-util.workspace = true
tokio = { workspace = true, features = ["signal"] }
@@ -120,4 +119,3 @@ rcgen.workspace = true
rstest.workspace = true
walkdir.workspace = true
rand_distr = "0.4"
tokio-postgres.workspace = true

View File

@@ -66,7 +66,7 @@ pub(super) async fn authenticate(
Ok(ComputeCredentials {
info: creds,
keys: ComputeCredentialKeys::AuthKeys(postgres_client::config::AuthKeys::ScramSha256(
keys: ComputeCredentialKeys::AuthKeys(tokio_postgres::config::AuthKeys::ScramSha256(
scram_keys,
)),
})

View File

@@ -1,8 +1,8 @@
use async_trait::async_trait;
use postgres_client::config::SslMode;
use pq_proto::BeMessage as Be;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_postgres::config::SslMode;
use tracing::{info, info_span};
use super::ComputeCredentialKeys;
@@ -49,19 +49,13 @@ impl ReportableError for ConsoleRedirectError {
}
}
fn hello_message(
redirect_uri: &reqwest::Url,
session_id: &str,
duration: std::time::Duration,
) -> String {
let formatted_duration = humantime::format_duration(duration).to_string();
fn hello_message(redirect_uri: &reqwest::Url, session_id: &str) -> String {
format!(
concat![
"Welcome to Neon!\n",
"Authenticate by visiting (will expire in {duration}):\n",
"Authenticate by visiting:\n",
" {redirect_uri}{session_id}\n\n",
],
duration = formatted_duration,
redirect_uri = redirect_uri,
session_id = session_id,
)
@@ -124,11 +118,7 @@ async fn authenticate(
};
let span = info_span!("console_redirect", psql_session_id = &psql_session_id);
let greeting = hello_message(
link_uri,
&psql_session_id,
auth_config.console_redirect_confirmation_timeout,
);
let greeting = hello_message(link_uri, &psql_session_id);
// Give user a URL to spawn a new database.
info!(parent: &span, "sending the auth URL to the user");
@@ -161,8 +151,12 @@ async fn authenticate(
// This config should be self-contained, because we won't
// take username or dbname from client's startup message.
let mut config = compute::ConnCfg::new(db_info.host.to_string(), db_info.port);
config.dbname(&db_info.dbname).user(&db_info.user);
let mut config = compute::ConnCfg::new();
config
.host(&db_info.host)
.port(db_info.port)
.dbname(&db_info.dbname)
.user(&db_info.user);
ctx.set_dbname(db_info.dbname.into());
ctx.set_user(db_info.user.into());

View File

@@ -29,7 +29,12 @@ impl LocalBackend {
api: http::Endpoint::new(compute_ctl, http::new_client()),
},
node_info: NodeInfo {
config: ConnCfg::new(postgres_addr.ip().to_string(), postgres_addr.port()),
config: {
let mut cfg = ConnCfg::new();
cfg.host(&postgres_addr.ip().to_string());
cfg.port(postgres_addr.port());
cfg
},
// TODO(conrad): make this better reflect compute info rather than endpoint info.
aux: MetricsAuxInfo {
endpoint_id: EndpointIdTag::get_interner().get_or_intern("local"),

View File

@@ -11,8 +11,8 @@ pub use console_redirect::ConsoleRedirectBackend;
pub(crate) use console_redirect::ConsoleRedirectError;
use ipnet::{Ipv4Net, Ipv6Net};
use local::LocalBackend;
use postgres_client::config::AuthKeys;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_postgres::config::AuthKeys;
use tracing::{debug, info, warn};
use crate::auth::credentials::check_peer_addr_is_in_list;

View File

@@ -227,7 +227,7 @@ pub(crate) async fn validate_password_and_exchange(
};
Ok(sasl::Outcome::Success(ComputeCredentialKeys::AuthKeys(
postgres_client::config::AuthKeys::ScramSha256(keys),
tokio_postgres::config::AuthKeys::ScramSha256(keys),
)))
}
}

View File

@@ -3,11 +3,11 @@ use std::sync::Arc;
use dashmap::DashMap;
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
use postgres_client::{CancelToken, NoTls};
use pq_proto::CancelKeyData;
use thiserror::Error;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tokio_postgres::{CancelToken, NoTls};
use tracing::{debug, info};
use uuid::Uuid;
@@ -44,7 +44,7 @@ pub(crate) enum CancelError {
IO(#[from] std::io::Error),
#[error("{0}")]
Postgres(#[from] postgres_client::Error),
Postgres(#[from] tokio_postgres::Error),
#[error("rate limit exceeded")]
RateLimit,
@@ -70,7 +70,7 @@ impl ReportableError for CancelError {
impl<P: CancellationPublisher> CancellationHandler<P> {
/// Run async action within an ephemeral session identified by [`CancelKeyData`].
pub(crate) fn get_session(self: Arc<Self>) -> Session<P> {
// HACK: We'd rather get the real backend_pid but postgres_client doesn't
// HACK: We'd rather get the real backend_pid but tokio_postgres doesn't
// expose it and we don't want to do another roundtrip to query
// for it. The client will be able to notice that this is not the
// actual backend_pid, but backend_pid is not used for anything

View File

@@ -6,8 +6,6 @@ use std::time::Duration;
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
use once_cell::sync::OnceCell;
use postgres_client::tls::MakeTlsConnect;
use postgres_client::{CancelToken, RawConnection};
use postgres_protocol::message::backend::NoticeResponseBody;
use pq_proto::StartupMessageParams;
use rustls::client::danger::ServerCertVerifier;
@@ -15,6 +13,8 @@ use rustls::crypto::ring;
use rustls::pki_types::InvalidDnsNameError;
use thiserror::Error;
use tokio::net::TcpStream;
use tokio_postgres::tls::MakeTlsConnect;
use tokio_postgres::{CancelToken, RawConnection};
use tracing::{debug, error, info, warn};
use crate::auth::parse_endpoint_param;
@@ -34,9 +34,9 @@ pub const COULD_NOT_CONNECT: &str = "Couldn't connect to compute node";
#[derive(Debug, Error)]
pub(crate) enum ConnectionError {
/// This error doesn't seem to reveal any secrets; for instance,
/// `postgres_client::error::Kind` doesn't contain ip addresses and such.
/// `tokio_postgres::error::Kind` doesn't contain ip addresses and such.
#[error("{COULD_NOT_CONNECT}: {0}")]
Postgres(#[from] postgres_client::Error),
Postgres(#[from] tokio_postgres::Error),
#[error("{COULD_NOT_CONNECT}: {0}")]
CouldNotConnect(#[from] io::Error),
@@ -99,18 +99,18 @@ impl ReportableError for ConnectionError {
}
/// A pair of `ClientKey` & `ServerKey` for `SCRAM-SHA-256`.
pub(crate) type ScramKeys = postgres_client::config::ScramKeys<32>;
pub(crate) type ScramKeys = tokio_postgres::config::ScramKeys<32>;
/// A config for establishing a connection to compute node.
/// Eventually, `postgres_client` will be replaced with something better.
/// Eventually, `tokio_postgres` will be replaced with something better.
/// Newtype allows us to implement methods on top of it.
#[derive(Clone)]
pub(crate) struct ConnCfg(Box<postgres_client::Config>);
#[derive(Clone, Default)]
pub(crate) struct ConnCfg(Box<tokio_postgres::Config>);
/// Creation and initialization routines.
impl ConnCfg {
pub(crate) fn new(host: String, port: u16) -> Self {
Self(Box::new(postgres_client::Config::new(host, port)))
pub(crate) fn new() -> Self {
Self::default()
}
/// Reuse password or auth keys from the other config.
@@ -124,9 +124,13 @@ impl ConnCfg {
}
}
pub(crate) fn get_host(&self) -> Host {
match self.0.get_host() {
postgres_client::config::Host::Tcp(s) => s.into(),
pub(crate) fn get_host(&self) -> Result<Host, WakeComputeError> {
match self.0.get_hosts() {
[tokio_postgres::config::Host::Tcp(s)] => Ok(s.into()),
// we should not have multiple address or unix addresses.
_ => Err(WakeComputeError::BadComputeAddress(
"invalid compute address".into(),
)),
}
}
@@ -156,7 +160,7 @@ impl ConnCfg {
// TODO: This is especially ugly...
if let Some(replication) = params.get("replication") {
use postgres_client::config::ReplicationMode;
use tokio_postgres::config::ReplicationMode;
match replication {
"true" | "on" | "yes" | "1" => {
self.replication_mode(ReplicationMode::Physical);
@@ -178,7 +182,7 @@ impl ConnCfg {
}
impl std::ops::Deref for ConnCfg {
type Target = postgres_client::Config;
type Target = tokio_postgres::Config;
fn deref(&self) -> &Self::Target {
&self.0
@@ -195,7 +199,7 @@ impl std::ops::DerefMut for ConnCfg {
impl ConnCfg {
/// Establish a raw TCP connection to the compute node.
async fn connect_raw(&self, timeout: Duration) -> io::Result<(SocketAddr, TcpStream, &str)> {
use postgres_client::config::Host;
use tokio_postgres::config::Host;
// wrap TcpStream::connect with timeout
let connect_with_timeout = |host, port| {
@@ -220,23 +224,46 @@ impl ConnCfg {
})
};
// We can't reuse connection establishing logic from `postgres_client` here,
// We can't reuse connection establishing logic from `tokio_postgres` here,
// because it has no means for extracting the underlying socket which we
// require for our business.
let port = self.0.get_port();
let host = self.0.get_host();
let mut connection_error = None;
let ports = self.0.get_ports();
let hosts = self.0.get_hosts();
// the ports array is supposed to have 0 entries, 1 entry, or as many entries as in the hosts array
if ports.len() > 1 && ports.len() != hosts.len() {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"bad compute config, \
ports and hosts entries' count does not match: {:?}",
self.0
),
));
}
let host = match host {
Host::Tcp(host) => host.as_str(),
};
for (i, host) in hosts.iter().enumerate() {
let port = ports.get(i).or_else(|| ports.first()).unwrap_or(&5432);
let host = match host {
Host::Tcp(host) => host.as_str(),
};
match connect_once(host, port).await {
Ok((sockaddr, stream)) => Ok((sockaddr, stream, host)),
Err(err) => {
warn!("couldn't connect to compute node at {host}:{port}: {err}");
Err(err)
match connect_once(host, *port).await {
Ok((sockaddr, stream)) => return Ok((sockaddr, stream, host)),
Err(err) => {
// We can't throw an error here, as there might be more hosts to try.
warn!("couldn't connect to compute node at {host}:{port}: {err}");
connection_error = Some(err);
}
}
}
Err(connection_error.unwrap_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
format!("bad compute config: {:?}", self.0),
)
}))
}
}
@@ -245,7 +272,7 @@ type RustlsStream = <MakeRustlsConnect as MakeTlsConnect<tokio::net::TcpStream>>
pub(crate) struct PostgresConnection {
/// Socket connected to a compute node.
pub(crate) stream:
postgres_client::maybe_tls_stream::MaybeTlsStream<tokio::net::TcpStream, RustlsStream>,
tokio_postgres::maybe_tls_stream::MaybeTlsStream<tokio::net::TcpStream, RustlsStream>,
/// PostgreSQL connection parameters.
pub(crate) params: std::collections::HashMap<String, String>,
/// Query cancellation token.

View File

@@ -5,6 +5,7 @@ use std::sync::Arc;
use futures::TryFutureExt;
use thiserror::Error;
use tokio_postgres::config::SslMode;
use tokio_postgres::Client;
use tracing::{error, info, info_span, warn, Instrument};
@@ -160,11 +161,11 @@ impl MockControlPlane {
}
async fn do_wake_compute(&self) -> Result<NodeInfo, WakeComputeError> {
let mut config = compute::ConnCfg::new(
self.endpoint.host_str().unwrap_or("localhost").to_owned(),
self.endpoint.port().unwrap_or(5432),
);
config.ssl_mode(postgres_client::config::SslMode::Disable);
let mut config = compute::ConnCfg::new();
config
.host(self.endpoint.host_str().unwrap_or("localhost"))
.port(self.endpoint.port().unwrap_or(5432))
.ssl_mode(SslMode::Disable);
let node = NodeInfo {
config,

View File

@@ -6,8 +6,8 @@ use std::time::Duration;
use ::http::header::AUTHORIZATION;
use ::http::HeaderName;
use futures::TryFutureExt;
use postgres_client::config::SslMode;
use tokio::time::Instant;
use tokio_postgres::config::SslMode;
use tracing::{debug, info, info_span, warn, Instrument};
use super::super::messages::{ControlPlaneErrorMessage, GetRoleSecret, WakeCompute};
@@ -241,8 +241,8 @@ impl NeonControlPlaneClient {
// Don't set anything but host and port! This config will be cached.
// We'll set username and such later using the startup message.
// TODO: add more type safety (in progress).
let mut config = compute::ConnCfg::new(host.to_owned(), port);
config.ssl_mode(SslMode::Disable); // TLS is not configured on compute nodes.
let mut config = compute::ConnCfg::new();
config.host(host).port(port).ssl_mode(SslMode::Disable); // TLS is not configured on compute nodes.
let node = NodeInfo {
config,

View File

@@ -84,7 +84,7 @@ pub(crate) trait ReportableError: fmt::Display + Send + 'static {
fn get_error_kind(&self) -> ErrorKind;
}
impl ReportableError for postgres_client::error::Error {
impl ReportableError for tokio_postgres::error::Error {
fn get_error_kind(&self) -> ErrorKind {
if self.as_db_error().is_some() {
ErrorKind::Postgres

View File

@@ -1,10 +1,10 @@
use std::convert::TryFrom;
use std::sync::Arc;
use postgres_client::tls::MakeTlsConnect;
use rustls::pki_types::ServerName;
use rustls::ClientConfig;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_postgres::tls::MakeTlsConnect;
mod private {
use std::future::Future;
@@ -12,9 +12,9 @@ mod private {
use std::pin::Pin;
use std::task::{Context, Poll};
use postgres_client::tls::{ChannelBinding, TlsConnect};
use rustls::pki_types::ServerName;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_postgres::tls::{ChannelBinding, TlsConnect};
use tokio_rustls::client::TlsStream;
use tokio_rustls::TlsConnector;
@@ -59,7 +59,7 @@ mod private {
pub struct RustlsStream<S>(TlsStream<S>);
impl<S> postgres_client::tls::TlsStream for RustlsStream<S>
impl<S> tokio_postgres::tls::TlsStream for RustlsStream<S>
where
S: AsyncRead + AsyncWrite + Unpin,
{

View File

@@ -86,7 +86,7 @@ impl ConnectMechanism for TcpMechanism<'_> {
node_info: &control_plane::CachedNodeInfo,
timeout: time::Duration,
) -> Result<PostgresConnection, Self::Error> {
let host = node_info.config.get_host();
let host = node_info.config.get_host()?;
let permit = self.locks.get_permit(&host).await?;
permit.release_result(node_info.connect(ctx, timeout).await)
}

View File

@@ -31,9 +31,9 @@ impl CouldRetry for io::Error {
}
}
impl CouldRetry for postgres_client::error::DbError {
impl CouldRetry for tokio_postgres::error::DbError {
fn could_retry(&self) -> bool {
use postgres_client::error::SqlState;
use tokio_postgres::error::SqlState;
matches!(
self.code(),
&SqlState::CONNECTION_FAILURE
@@ -43,9 +43,9 @@ impl CouldRetry for postgres_client::error::DbError {
)
}
}
impl ShouldRetryWakeCompute for postgres_client::error::DbError {
impl ShouldRetryWakeCompute for tokio_postgres::error::DbError {
fn should_retry_wake_compute(&self) -> bool {
use postgres_client::error::SqlState;
use tokio_postgres::error::SqlState;
// Here are errors that happens after the user successfully authenticated to the database.
// TODO: there are pgbouncer errors that should be retried, but they are not listed here.
!matches!(
@@ -61,21 +61,21 @@ impl ShouldRetryWakeCompute for postgres_client::error::DbError {
}
}
impl CouldRetry for postgres_client::Error {
impl CouldRetry for tokio_postgres::Error {
fn could_retry(&self) -> bool {
if let Some(io_err) = self.source().and_then(|x| x.downcast_ref()) {
io::Error::could_retry(io_err)
} else if let Some(db_err) = self.source().and_then(|x| x.downcast_ref()) {
postgres_client::error::DbError::could_retry(db_err)
tokio_postgres::error::DbError::could_retry(db_err)
} else {
false
}
}
}
impl ShouldRetryWakeCompute for postgres_client::Error {
impl ShouldRetryWakeCompute for tokio_postgres::Error {
fn should_retry_wake_compute(&self) -> bool {
if let Some(db_err) = self.source().and_then(|x| x.downcast_ref()) {
postgres_client::error::DbError::should_retry_wake_compute(db_err)
tokio_postgres::error::DbError::should_retry_wake_compute(db_err)
} else {
// likely an IO error. Possible the compute has shutdown and the
// cache is stale.

View File

@@ -8,9 +8,9 @@ use std::fmt::Debug;
use bytes::{Bytes, BytesMut};
use futures::{SinkExt, StreamExt};
use postgres_client::tls::TlsConnect;
use postgres_protocol::message::frontend;
use tokio::io::{AsyncReadExt, DuplexStream};
use tokio_postgres::tls::TlsConnect;
use tokio_util::codec::{Decoder, Encoder};
use super::*;
@@ -158,8 +158,8 @@ async fn scram_auth_disable_channel_binding() -> anyhow::Result<()> {
Scram::new("password").await?,
));
let _client_err = postgres_client::Config::new("test".to_owned(), 5432)
.channel_binding(postgres_client::config::ChannelBinding::Disable)
let _client_err = tokio_postgres::Config::new()
.channel_binding(tokio_postgres::config::ChannelBinding::Disable)
.user("user")
.dbname("db")
.password("password")
@@ -175,7 +175,7 @@ async fn scram_auth_disable_channel_binding() -> anyhow::Result<()> {
async fn scram_auth_prefer_channel_binding() -> anyhow::Result<()> {
connect_failure(
Intercept::None,
postgres_client::config::ChannelBinding::Prefer,
tokio_postgres::config::ChannelBinding::Prefer,
)
.await
}
@@ -185,7 +185,7 @@ async fn scram_auth_prefer_channel_binding() -> anyhow::Result<()> {
async fn scram_auth_prefer_channel_binding_intercept() -> anyhow::Result<()> {
connect_failure(
Intercept::Methods,
postgres_client::config::ChannelBinding::Prefer,
tokio_postgres::config::ChannelBinding::Prefer,
)
.await
}
@@ -195,7 +195,7 @@ async fn scram_auth_prefer_channel_binding_intercept() -> anyhow::Result<()> {
async fn scram_auth_prefer_channel_binding_intercept_response() -> anyhow::Result<()> {
connect_failure(
Intercept::SASLResponse,
postgres_client::config::ChannelBinding::Prefer,
tokio_postgres::config::ChannelBinding::Prefer,
)
.await
}
@@ -205,7 +205,7 @@ async fn scram_auth_prefer_channel_binding_intercept_response() -> anyhow::Resul
async fn scram_auth_require_channel_binding() -> anyhow::Result<()> {
connect_failure(
Intercept::None,
postgres_client::config::ChannelBinding::Require,
tokio_postgres::config::ChannelBinding::Require,
)
.await
}
@@ -215,7 +215,7 @@ async fn scram_auth_require_channel_binding() -> anyhow::Result<()> {
async fn scram_auth_require_channel_binding_intercept() -> anyhow::Result<()> {
connect_failure(
Intercept::Methods,
postgres_client::config::ChannelBinding::Require,
tokio_postgres::config::ChannelBinding::Require,
)
.await
}
@@ -225,14 +225,14 @@ async fn scram_auth_require_channel_binding_intercept() -> anyhow::Result<()> {
async fn scram_auth_require_channel_binding_intercept_response() -> anyhow::Result<()> {
connect_failure(
Intercept::SASLResponse,
postgres_client::config::ChannelBinding::Require,
tokio_postgres::config::ChannelBinding::Require,
)
.await
}
async fn connect_failure(
intercept: Intercept,
channel_binding: postgres_client::config::ChannelBinding,
channel_binding: tokio_postgres::config::ChannelBinding,
) -> anyhow::Result<()> {
let (server, client, client_config, server_config) = proxy_mitm(intercept).await;
let proxy = tokio::spawn(dummy_proxy(
@@ -241,7 +241,7 @@ async fn connect_failure(
Scram::new("password").await?,
));
let _client_err = postgres_client::Config::new("test".to_owned(), 5432)
let _client_err = tokio_postgres::Config::new()
.channel_binding(channel_binding)
.user("user")
.dbname("db")

View File

@@ -7,13 +7,13 @@ use std::time::Duration;
use anyhow::{bail, Context};
use async_trait::async_trait;
use http::StatusCode;
use postgres_client::config::SslMode;
use postgres_client::tls::{MakeTlsConnect, NoTls};
use retry::{retry_after, ShouldRetryWakeCompute};
use rstest::rstest;
use rustls::crypto::ring;
use rustls::pki_types;
use tokio::io::DuplexStream;
use tokio_postgres::config::SslMode;
use tokio_postgres::tls::{MakeTlsConnect, NoTls};
use super::connect_compute::ConnectMechanism;
use super::retry::CouldRetry;
@@ -204,7 +204,7 @@ async fn handshake_tls_is_enforced_by_proxy() -> anyhow::Result<()> {
let (_, server_config) = generate_tls_config("generic-project-name.localhost", "localhost")?;
let proxy = tokio::spawn(dummy_proxy(client, Some(server_config), NoAuth));
let client_err = postgres_client::Config::new("test".to_owned(), 5432)
let client_err = tokio_postgres::Config::new()
.user("john_doe")
.dbname("earth")
.ssl_mode(SslMode::Disable)
@@ -233,7 +233,7 @@ async fn handshake_tls() -> anyhow::Result<()> {
generate_tls_config("generic-project-name.localhost", "localhost")?;
let proxy = tokio::spawn(dummy_proxy(client, Some(server_config), NoAuth));
let _conn = postgres_client::Config::new("test".to_owned(), 5432)
let _conn = tokio_postgres::Config::new()
.user("john_doe")
.dbname("earth")
.ssl_mode(SslMode::Require)
@@ -249,7 +249,7 @@ async fn handshake_raw() -> anyhow::Result<()> {
let proxy = tokio::spawn(dummy_proxy(client, None, NoAuth));
let _conn = postgres_client::Config::new("test".to_owned(), 5432)
let _conn = tokio_postgres::Config::new()
.user("john_doe")
.dbname("earth")
.options("project=generic-project-name")
@@ -296,8 +296,8 @@ async fn scram_auth_good(#[case] password: &str) -> anyhow::Result<()> {
Scram::new(password).await?,
));
let _conn = postgres_client::Config::new("test".to_owned(), 5432)
.channel_binding(postgres_client::config::ChannelBinding::Require)
let _conn = tokio_postgres::Config::new()
.channel_binding(tokio_postgres::config::ChannelBinding::Require)
.user("user")
.dbname("db")
.password(password)
@@ -320,8 +320,8 @@ async fn scram_auth_disable_channel_binding() -> anyhow::Result<()> {
Scram::new("password").await?,
));
let _conn = postgres_client::Config::new("test".to_owned(), 5432)
.channel_binding(postgres_client::config::ChannelBinding::Disable)
let _conn = tokio_postgres::Config::new()
.channel_binding(tokio_postgres::config::ChannelBinding::Disable)
.user("user")
.dbname("db")
.password("password")
@@ -348,7 +348,7 @@ async fn scram_auth_mock() -> anyhow::Result<()> {
.map(char::from)
.collect();
let _client_err = postgres_client::Config::new("test".to_owned(), 5432)
let _client_err = tokio_postgres::Config::new()
.user("user")
.dbname("db")
.password(&password) // no password will match the mocked secret
@@ -546,7 +546,7 @@ impl TestControlPlaneClient for TestConnectMechanism {
fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeInfo {
let node = NodeInfo {
config: compute::ConnCfg::new("test".to_owned(), 5432),
config: compute::ConnCfg::new(),
aux: MetricsAuxInfo {
endpoint_id: (&EndpointId::from("endpoint")).into(),
project_id: (&ProjectId::from("project")).into(),

View File

@@ -37,9 +37,9 @@ use crate::types::{EndpointId, Host, LOCAL_PROXY_SUFFIX};
pub(crate) struct PoolingBackend {
pub(crate) http_conn_pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
pub(crate) local_pool: Arc<LocalConnPool<postgres_client::Client>>,
pub(crate) local_pool: Arc<LocalConnPool<tokio_postgres::Client>>,
pub(crate) pool:
Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
Arc<GlobalConnPool<tokio_postgres::Client, EndpointConnPool<tokio_postgres::Client>>>,
pub(crate) config: &'static ProxyConfig,
pub(crate) auth_backend: &'static crate::auth::Backend<'static, ()>,
@@ -170,7 +170,7 @@ impl PoolingBackend {
conn_info: ConnInfo,
keys: ComputeCredentials,
force_new: bool,
) -> Result<Client<postgres_client::Client>, HttpConnError> {
) -> Result<Client<tokio_postgres::Client>, HttpConnError> {
let maybe_client = if force_new {
debug!("pool: pool is disabled");
None
@@ -256,7 +256,7 @@ impl PoolingBackend {
&self,
ctx: &RequestContext,
conn_info: ConnInfo,
) -> Result<Client<postgres_client::Client>, HttpConnError> {
) -> Result<Client<tokio_postgres::Client>, HttpConnError> {
if let Some(client) = self.local_pool.get(ctx, &conn_info)? {
return Ok(client);
}
@@ -315,7 +315,7 @@ impl PoolingBackend {
));
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let (client, connection) = config.connect(postgres_client::NoTls).await?;
let (client, connection) = config.connect(tokio_postgres::NoTls).await?;
drop(pause);
let pid = client.get_process_id();
@@ -360,7 +360,7 @@ pub(crate) enum HttpConnError {
#[error("pooled connection closed at inconsistent state")]
ConnectionClosedAbruptly(#[from] tokio::sync::watch::error::SendError<uuid::Uuid>),
#[error("could not connection to postgres in compute")]
PostgresConnectionError(#[from] postgres_client::Error),
PostgresConnectionError(#[from] tokio_postgres::Error),
#[error("could not connection to local-proxy in compute")]
LocalProxyConnectionError(#[from] LocalProxyConnError),
#[error("could not parse JWT payload")]
@@ -479,7 +479,7 @@ impl ShouldRetryWakeCompute for LocalProxyConnError {
}
struct TokioMechanism {
pool: Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
pool: Arc<GlobalConnPool<tokio_postgres::Client, EndpointConnPool<tokio_postgres::Client>>>,
conn_info: ConnInfo,
conn_id: uuid::Uuid,
@@ -489,7 +489,7 @@ struct TokioMechanism {
#[async_trait]
impl ConnectMechanism for TokioMechanism {
type Connection = Client<postgres_client::Client>;
type Connection = Client<tokio_postgres::Client>;
type ConnectError = HttpConnError;
type Error = HttpConnError;
@@ -499,7 +499,7 @@ impl ConnectMechanism for TokioMechanism {
node_info: &CachedNodeInfo,
timeout: Duration,
) -> Result<Self::Connection, Self::ConnectError> {
let host = node_info.config.get_host();
let host = node_info.config.get_host()?;
let permit = self.locks.get_permit(&host).await?;
let mut config = (*node_info.config).clone();
@@ -509,7 +509,7 @@ impl ConnectMechanism for TokioMechanism {
.connect_timeout(timeout);
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let res = config.connect(postgres_client::NoTls).await;
let res = config.connect(tokio_postgres::NoTls).await;
drop(pause);
let (client, connection) = permit.release_result(res)?;
@@ -549,12 +549,16 @@ impl ConnectMechanism for HyperMechanism {
node_info: &CachedNodeInfo,
timeout: Duration,
) -> Result<Self::Connection, Self::ConnectError> {
let host = node_info.config.get_host();
let host = node_info.config.get_host()?;
let permit = self.locks.get_permit(&host).await?;
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let port = node_info.config.get_port();
let port = *node_info.config.get_ports().first().ok_or_else(|| {
HttpConnError::WakeCompute(WakeComputeError::BadComputeAddress(
"local-proxy port missing on compute address".into(),
))
})?;
let res = connect_http2(&host, port, timeout).await;
drop(pause);
let (client, connection) = permit.release_result(res)?;

View File

@@ -5,11 +5,11 @@ use std::task::{ready, Poll};
use futures::future::poll_fn;
use futures::Future;
use postgres_client::tls::NoTlsStream;
use postgres_client::AsyncMessage;
use smallvec::SmallVec;
use tokio::net::TcpStream;
use tokio::time::Instant;
use tokio_postgres::tls::NoTlsStream;
use tokio_postgres::AsyncMessage;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, info_span, warn, Instrument};
#[cfg(test)]
@@ -58,7 +58,7 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
ctx: &RequestContext,
conn_info: ConnInfo,
client: C,
mut connection: postgres_client::Connection<TcpStream, NoTlsStream>,
mut connection: tokio_postgres::Connection<TcpStream, NoTlsStream>,
conn_id: uuid::Uuid,
aux: MetricsAuxInfo,
) -> Client<C> {

View File

@@ -7,8 +7,8 @@ use std::time::Duration;
use dashmap::DashMap;
use parking_lot::RwLock;
use postgres_client::ReadyForQueryStatus;
use rand::Rng;
use tokio_postgres::ReadyForQueryStatus;
use tracing::{debug, info, Span};
use super::backend::HttpConnError;
@@ -683,7 +683,7 @@ pub(crate) trait ClientInnerExt: Sync + Send + 'static {
fn get_process_id(&self) -> i32;
}
impl ClientInnerExt for postgres_client::Client {
impl ClientInnerExt for tokio_postgres::Client {
fn is_closed(&self) -> bool {
self.is_closed()
}

View File

@@ -1,6 +1,6 @@
use postgres_client::types::{Kind, Type};
use postgres_client::Row;
use serde_json::{Map, Value};
use tokio_postgres::types::{Kind, Type};
use tokio_postgres::Row;
//
// Convert json non-string types to strings, so that they can be passed to Postgres
@@ -61,7 +61,7 @@ fn json_array_to_pg_array(value: &Value) -> Option<String> {
#[derive(Debug, thiserror::Error)]
pub(crate) enum JsonConversionError {
#[error("internal error compute returned invalid data: {0}")]
AsTextError(postgres_client::Error),
AsTextError(tokio_postgres::Error),
#[error("parse int error: {0}")]
ParseIntError(#[from] std::num::ParseIntError),
#[error("parse float error: {0}")]

View File

@@ -22,13 +22,13 @@ use indexmap::IndexMap;
use jose_jwk::jose_b64::base64ct::{Base64UrlUnpadded, Encoding};
use p256::ecdsa::{Signature, SigningKey};
use parking_lot::RwLock;
use postgres_client::tls::NoTlsStream;
use postgres_client::types::ToSql;
use postgres_client::AsyncMessage;
use serde_json::value::RawValue;
use signature::Signer;
use tokio::net::TcpStream;
use tokio::time::Instant;
use tokio_postgres::tls::NoTlsStream;
use tokio_postgres::types::ToSql;
use tokio_postgres::AsyncMessage;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, info_span, warn, Instrument};
@@ -164,7 +164,7 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
ctx: &RequestContext,
conn_info: ConnInfo,
client: C,
mut connection: postgres_client::Connection<TcpStream, NoTlsStream>,
mut connection: tokio_postgres::Connection<TcpStream, NoTlsStream>,
key: SigningKey,
conn_id: uuid::Uuid,
aux: MetricsAuxInfo,
@@ -280,7 +280,7 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
)
}
impl ClientInnerCommon<postgres_client::Client> {
impl ClientInnerCommon<tokio_postgres::Client> {
pub(crate) async fn set_jwt_session(&mut self, payload: &[u8]) -> Result<(), HttpConnError> {
if let ClientDataEnum::Local(local_data) = &mut self.data {
local_data.jti += 1;

View File

@@ -11,12 +11,12 @@ use http_body_util::{BodyExt, Full};
use hyper::body::Incoming;
use hyper::http::{HeaderName, HeaderValue};
use hyper::{header, HeaderMap, Request, Response, StatusCode};
use postgres_client::error::{DbError, ErrorPosition, SqlState};
use postgres_client::{GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, Transaction};
use pq_proto::StartupMessageParamsBuilder;
use serde::Serialize;
use serde_json::Value;
use tokio::time::{self, Instant};
use tokio_postgres::error::{DbError, ErrorPosition, SqlState};
use tokio_postgres::{GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, Transaction};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info};
use typed_json::json;
@@ -361,7 +361,7 @@ pub(crate) enum SqlOverHttpError {
#[error("invalid isolation level")]
InvalidIsolationLevel,
#[error("{0}")]
Postgres(#[from] postgres_client::Error),
Postgres(#[from] tokio_postgres::Error),
#[error("{0}")]
JsonConversion(#[from] JsonConversionError),
#[error("{0}")]
@@ -986,7 +986,7 @@ async fn query_to_json<T: GenericClient>(
// Manually drain the stream into a vector to leave row_stream hanging
// around to get a command tag. Also check that the response is not too
// big.
let mut rows: Vec<postgres_client::Row> = Vec::new();
let mut rows: Vec<tokio_postgres::Row> = Vec::new();
while let Some(row) = row_stream.next().await {
let row = row?;
*current_size += row.body_len();
@@ -1063,13 +1063,13 @@ async fn query_to_json<T: GenericClient>(
}
enum Client {
Remote(conn_pool_lib::Client<postgres_client::Client>),
Local(conn_pool_lib::Client<postgres_client::Client>),
Remote(conn_pool_lib::Client<tokio_postgres::Client>),
Local(conn_pool_lib::Client<tokio_postgres::Client>),
}
enum Discard<'a> {
Remote(conn_pool_lib::Discard<'a, postgres_client::Client>),
Local(conn_pool_lib::Discard<'a, postgres_client::Client>),
Remote(conn_pool_lib::Discard<'a, tokio_postgres::Client>),
Local(conn_pool_lib::Discard<'a, tokio_postgres::Client>),
}
impl Client {
@@ -1080,7 +1080,7 @@ impl Client {
}
}
fn inner(&mut self) -> (&mut postgres_client::Client, Discard<'_>) {
fn inner(&mut self) -> (&mut tokio_postgres::Client, Discard<'_>) {
match self {
Client::Remote(client) => {
let (c, d) = client.inner();

View File

@@ -44,12 +44,12 @@ use futures::{stream::FuturesUnordered, StreamExt};
use itertools::Itertools;
use pageserver_api::{
controller_api::{
AvailabilityZone, MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability,
NodeRegisterRequest, NodeSchedulingPolicy, NodeShard, NodeShardResponse, PlacementPolicy,
ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse,
TenantCreateRequest, TenantCreateResponse, TenantCreateResponseShard,
TenantDescribeResponse, TenantDescribeResponseShard, TenantLocateResponse,
TenantPolicyRequest, TenantShardMigrateRequest, TenantShardMigrateResponse,
MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability, NodeRegisterRequest,
NodeSchedulingPolicy, NodeShard, NodeShardResponse, PlacementPolicy, ShardSchedulingPolicy,
ShardsPreferredAzsRequest, ShardsPreferredAzsResponse, TenantCreateRequest,
TenantCreateResponse, TenantCreateResponseShard, TenantDescribeResponse,
TenantDescribeResponseShard, TenantLocateResponse, TenantPolicyRequest,
TenantShardMigrateRequest, TenantShardMigrateResponse,
},
models::{
SecondaryProgress, TenantConfigRequest, TimelineArchivalConfigRequest,
@@ -468,7 +468,6 @@ struct ShardSplitParams {
policy: PlacementPolicy,
config: TenantConfig,
shard_ident: ShardIdentity,
preferred_az_id: Option<AvailabilityZone>,
}
// When preparing for a shard split, we may either choose to proceed with the split,
@@ -4104,7 +4103,7 @@ impl Service {
for parent_id in parent_ids {
let child_ids = parent_id.split(new_shard_count);
let (pageserver, generation, policy, parent_ident, config, preferred_az) = {
let (pageserver, generation, policy, parent_ident, config) = {
let mut old_state = tenants
.remove(&parent_id)
.expect("It was present, we just split it");
@@ -4123,7 +4122,6 @@ impl Service {
old_state.policy.clone(),
old_state.shard,
old_state.config.clone(),
old_state.preferred_az().cloned(),
)
};
@@ -4156,9 +4154,6 @@ impl Service {
};
child_state.generation = Some(generation);
child_state.config = config.clone();
if let Some(preferred_az) = &preferred_az {
child_state.set_preferred_az(preferred_az.clone());
}
// The child's TenantShard::splitting is intentionally left at the default value of Idle,
// as at this point in the split process we have succeeded and this part is infallible:
@@ -4351,7 +4346,6 @@ impl Service {
let mut policy = None;
let mut config = None;
let mut shard_ident = None;
let mut preferred_az_id = None;
// Validate input, and calculate which shards we will create
let (old_shard_count, targets) =
{
@@ -4410,9 +4404,6 @@ impl Service {
if config.is_none() {
config = Some(shard.config.clone());
}
if preferred_az_id.is_none() {
preferred_az_id = shard.preferred_az().cloned();
}
if tenant_shard_id.shard_count.count() == split_req.new_shard_count {
tracing::info!(
@@ -4483,7 +4474,6 @@ impl Service {
policy,
config,
shard_ident,
preferred_az_id,
})))
}
@@ -4506,7 +4496,6 @@ impl Service {
policy,
config,
shard_ident,
preferred_az_id,
} = *params;
// Drop any secondary locations: pageservers do not support splitting these, and in any case the
@@ -4580,7 +4569,7 @@ impl Service {
// Scheduling policies and preferred AZ do not carry through to children
scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
.unwrap(),
preferred_az_id: preferred_az_id.as_ref().map(|az| az.0.clone()),
preferred_az_id: None,
});
}
@@ -4700,6 +4689,47 @@ impl Service {
let (response, child_locations, waiters) =
self.tenant_shard_split_commit_inmem(tenant_id, new_shard_count, new_stripe_size);
// Now that we have scheduled the child shards, attempt to set their preferred AZ
// to that of the pageserver they've been attached on.
let preferred_azs = {
let locked = self.inner.read().unwrap();
child_locations
.iter()
.filter_map(|(tid, node_id, _stripe_size)| {
let az_id = locked
.nodes
.get(node_id)
.map(|n| n.get_availability_zone_id().clone())?;
Some((*tid, az_id))
})
.collect::<Vec<_>>()
};
let updated = self
.persistence
.set_tenant_shard_preferred_azs(preferred_azs)
.await
.map_err(|err| {
ApiError::InternalServerError(anyhow::anyhow!(
"Failed to persist preferred az ids: {err}"
))
});
match updated {
Ok(updated) => {
let mut locked = self.inner.write().unwrap();
for (tid, az_id) in updated {
if let Some(shard) = locked.tenants.get_mut(&tid) {
shard.set_preferred_az(az_id);
}
}
}
Err(err) => {
tracing::warn!("Failed to persist preferred AZs after split: {err}");
}
}
// Send compute notifications for all the new shards
let mut failed_notifications = Vec::new();
for (child_id, child_ps, stripe_size) in child_locations {

View File

@@ -4,21 +4,17 @@ use itertools::Itertools;
use pageserver::tenant::checks::check_valid_layermap;
use pageserver::tenant::layer_map::LayerMap;
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver::tenant::remote_timeline_client::manifest::TenantManifest;
use pageserver_api::shard::ShardIndex;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use utils::generation::Generation;
use utils::id::TimelineId;
use utils::shard::TenantShardId;
use crate::cloud_admin_api::BranchData;
use crate::metadata_stream::stream_listing;
use crate::{download_object_with_retries, RootTarget, TenantShardTimelineId};
use futures_util::StreamExt;
use pageserver::tenant::remote_timeline_client::{
parse_remote_index_path, parse_remote_tenant_manifest_path, remote_layer_path,
};
use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path};
use pageserver::tenant::storage_layer::LayerName;
use pageserver::tenant::IndexPart;
use remote_storage::{GenericRemoteStorage, ListingObject, RemotePath};
@@ -531,132 +527,3 @@ async fn list_timeline_blobs_impl(
unknown_keys,
}))
}
pub(crate) struct RemoteTenantManifestInfo {
pub(crate) latest_generation: Option<Generation>,
pub(crate) manifests: Vec<(Generation, ListingObject)>,
}
pub(crate) enum ListTenantManifestResult {
WithErrors {
errors: Vec<(String, String)>,
#[allow(dead_code)]
unknown_keys: Vec<ListingObject>,
},
NoErrors(RemoteTenantManifestInfo),
}
/// Lists the tenant manifests in remote storage and parses the latest one, returning a [`ListTenantManifestResult`] object.
pub(crate) async fn list_tenant_manifests(
remote_client: &GenericRemoteStorage,
tenant_id: TenantShardId,
root_target: &RootTarget,
) -> anyhow::Result<ListTenantManifestResult> {
let mut errors = Vec::new();
let mut unknown_keys = Vec::new();
let mut tenant_root_target = root_target.tenant_root(&tenant_id);
let original_prefix = tenant_root_target.prefix_in_bucket.clone();
const TENANT_MANIFEST_STEM: &str = "tenant-manifest";
tenant_root_target.prefix_in_bucket += TENANT_MANIFEST_STEM;
tenant_root_target.delimiter = String::new();
let mut manifests: Vec<(Generation, ListingObject)> = Vec::new();
let prefix_str = &original_prefix
.strip_prefix("/")
.unwrap_or(&original_prefix);
let mut stream = std::pin::pin!(stream_listing(remote_client, &tenant_root_target));
'outer: while let Some(obj) = stream.next().await {
let (key, Some(obj)) = obj? else {
panic!("ListingObject not specified");
};
'err: {
// TODO a let chain would be nicer here.
let Some(name) = key.object_name() else {
break 'err;
};
if !name.starts_with(TENANT_MANIFEST_STEM) {
break 'err;
}
let Some(generation) = parse_remote_tenant_manifest_path(key.clone()) else {
break 'err;
};
tracing::debug!("tenant manifest {key}");
manifests.push((generation, obj));
continue 'outer;
}
tracing::info!("Listed an unknown key: {key}");
unknown_keys.push(obj);
}
if manifests.is_empty() {
tracing::debug!("No manifest for timeline.");
return Ok(ListTenantManifestResult::WithErrors {
errors,
unknown_keys,
});
}
if !unknown_keys.is_empty() {
errors.push(((*prefix_str).to_owned(), "unknown keys listed".to_string()));
return Ok(ListTenantManifestResult::WithErrors {
errors,
unknown_keys,
});
}
// Find the manifest with the highest generation
let (latest_generation, latest_listing_object) = manifests
.iter()
.max_by_key(|i| i.0)
.map(|(g, obj)| (*g, obj.clone()))
.unwrap();
let manifest_bytes =
match download_object_with_retries(remote_client, &latest_listing_object.key).await {
Ok(bytes) => bytes,
Err(e) => {
// It is possible that the tenant gets deleted in-between we list the objects
// and we download the manifest file.
errors.push((
latest_listing_object.key.get_path().as_str().to_owned(),
format!("failed to download tenant-manifest.json: {e}"),
));
return Ok(ListTenantManifestResult::WithErrors {
errors,
unknown_keys,
});
}
};
match TenantManifest::from_json_bytes(&manifest_bytes) {
Ok(_manifest) => {
return Ok(ListTenantManifestResult::NoErrors(
RemoteTenantManifestInfo {
latest_generation: Some(latest_generation),
manifests,
},
));
}
Err(parse_error) => errors.push((
latest_listing_object.key.get_path().as_str().to_owned(),
format!("tenant-manifest.json body parsing error: {parse_error}"),
)),
}
if errors.is_empty() {
errors.push((
(*prefix_str).to_owned(),
"Unexpected: no errors did not lead to a successfully parsed blob return".to_string(),
));
}
Ok(ListTenantManifestResult::WithErrors {
errors,
unknown_keys,
})
}

View File

@@ -2,16 +2,12 @@ use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc;
use std::time::Duration;
use crate::checks::{
list_tenant_manifests, list_timeline_blobs, BlobDataParseResult, ListTenantManifestResult,
};
use crate::checks::{list_timeline_blobs, BlobDataParseResult};
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, MAX_RETRIES};
use futures_util::{StreamExt, TryStreamExt};
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver::tenant::remote_timeline_client::{
parse_remote_index_path, parse_remote_tenant_manifest_path, remote_layer_path,
};
use pageserver::tenant::remote_timeline_client::{parse_remote_index_path, remote_layer_path};
use pageserver::tenant::storage_layer::LayerName;
use pageserver::tenant::IndexPart;
use pageserver_api::controller_api::TenantDescribeResponse;
@@ -29,7 +25,6 @@ use utils::id::{TenantId, TenantTimelineId};
#[derive(Serialize, Default)]
pub struct GcSummary {
indices_deleted: usize,
tenant_manifests_deleted: usize,
remote_storage_errors: usize,
controller_api_errors: usize,
ancestor_layers_deleted: usize,
@@ -39,14 +34,12 @@ impl GcSummary {
fn merge(&mut self, other: Self) {
let Self {
indices_deleted,
tenant_manifests_deleted,
remote_storage_errors,
ancestor_layers_deleted,
controller_api_errors,
} = other;
self.indices_deleted += indices_deleted;
self.tenant_manifests_deleted += tenant_manifests_deleted;
self.remote_storage_errors += remote_storage_errors;
self.ancestor_layers_deleted += ancestor_layers_deleted;
self.controller_api_errors += controller_api_errors;
@@ -359,69 +352,6 @@ async fn maybe_delete_index(
}
}
async fn maybe_delete_tenant_manifest(
remote_client: &GenericRemoteStorage,
min_age: &Duration,
latest_gen: Generation,
obj: &ListingObject,
mode: GcMode,
summary: &mut GcSummary,
) {
// Validation: we will only delete things that parse cleanly
let basename = obj.key.get_path().file_name().unwrap();
let Some(candidate_generation) =
parse_remote_tenant_manifest_path(RemotePath::from_string(basename).unwrap())
else {
// A strange key: we will not delete this because we don't understand it.
tracing::warn!("Bad index key");
return;
};
// Validation: we will only delete manifests more than one generation old, and in fact we
// should never be called with such recent generations.
if candidate_generation >= latest_gen {
tracing::warn!("Deletion candidate is >= latest generation, this is a bug!");
return;
} else if candidate_generation.next() == latest_gen {
tracing::warn!("Deletion candidate is >= latest generation - 1, this is a bug!");
return;
}
if !is_old_enough(min_age, obj, summary) {
return;
}
if matches!(mode, GcMode::DryRun) {
tracing::info!("Dry run: would delete this key");
return;
}
// All validations passed: erase the object
let cancel = CancellationToken::new();
match backoff::retry(
|| remote_client.delete(&obj.key, &cancel),
|_| false,
3,
MAX_RETRIES as u32,
"maybe_delete_tenant_manifest",
&cancel,
)
.await
{
None => {
unreachable!("Using a dummy cancellation token");
}
Some(Ok(_)) => {
tracing::info!("Successfully deleted tenant manifest");
summary.tenant_manifests_deleted += 1;
}
Some(Err(e)) => {
tracing::warn!("Failed to delete tenant manifest: {e}");
summary.remote_storage_errors += 1;
}
}
}
#[allow(clippy::too_many_arguments)]
async fn gc_ancestor(
remote_client: &GenericRemoteStorage,
@@ -521,100 +451,13 @@ async fn gc_ancestor(
Ok(())
}
async fn gc_tenant_manifests(
remote_client: &GenericRemoteStorage,
min_age: Duration,
target: &RootTarget,
mode: GcMode,
tenant_shard_id: TenantShardId,
) -> anyhow::Result<GcSummary> {
let mut gc_summary = GcSummary::default();
match list_tenant_manifests(remote_client, tenant_shard_id, target).await? {
ListTenantManifestResult::WithErrors {
errors,
unknown_keys: _,
} => {
for (_key, error) in errors {
tracing::warn!(%tenant_shard_id, "list_tenant_manifests: {error}");
}
}
ListTenantManifestResult::NoErrors(mut manifest_info) => {
let Some(latest_gen) = manifest_info.latest_generation else {
return Ok(gc_summary);
};
manifest_info
.manifests
.sort_by_key(|(generation, _obj)| *generation);
// skip the two latest generations (they don't neccessarily have to be 1 apart from each other)
let candidates = manifest_info.manifests.iter().rev().skip(2);
for (_generation, key) in candidates {
maybe_delete_tenant_manifest(
remote_client,
&min_age,
latest_gen,
key,
mode,
&mut gc_summary,
)
.instrument(
info_span!("maybe_delete_tenant_manifest", %tenant_shard_id, ?latest_gen, %key.key),
)
.await;
}
}
}
Ok(gc_summary)
}
async fn gc_timeline(
remote_client: &GenericRemoteStorage,
min_age: &Duration,
target: &RootTarget,
mode: GcMode,
ttid: TenantShardTimelineId,
accumulator: &Arc<std::sync::Mutex<TenantRefAccumulator>>,
) -> anyhow::Result<GcSummary> {
let mut summary = GcSummary::default();
let data = list_timeline_blobs(remote_client, ttid, target).await?;
let (index_part, latest_gen, candidates) = match &data.blob_data {
BlobDataParseResult::Parsed {
index_part,
index_part_generation,
s3_layers: _s3_layers,
} => (index_part, *index_part_generation, data.unused_index_keys),
BlobDataParseResult::Relic => {
// Post-deletion tenant location: don't try and GC it.
return Ok(summary);
}
BlobDataParseResult::Incorrect {
errors,
s3_layers: _,
} => {
// Our primary purpose isn't to report on bad data, but log this rather than skipping silently
tracing::warn!("Skipping timeline {ttid}, bad metadata: {errors:?}");
return Ok(summary);
}
};
accumulator.lock().unwrap().update(ttid, index_part);
for key in candidates {
maybe_delete_index(remote_client, min_age, latest_gen, &key, mode, &mut summary)
.instrument(info_span!("maybe_delete_index", %ttid, ?latest_gen, %key.key))
.await;
}
Ok(summary)
}
/// Physical garbage collection: removing unused S3 objects.
///
/// This is distinct from the garbage collection done inside the pageserver, which operates at a higher level
/// (keys, layers). This type of garbage collection is about removing:
/// - Objects that were uploaded but never referenced in the remote index (e.g. because of a shutdown between
/// uploading a layer and uploading an index)
/// - Index objects and tenant manifests from historic generations
/// - Index objects from historic generations
///
/// This type of GC is not necessary for correctness: rather it serves to reduce wasted storage capacity, and
/// make sure that object listings don't get slowed down by large numbers of garbage objects.
@@ -627,7 +470,6 @@ pub async fn pageserver_physical_gc(
) -> anyhow::Result<GcSummary> {
let (remote_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
let remote_client = Arc::new(remote_client);
let tenants = if tenant_shard_ids.is_empty() {
futures::future::Either::Left(stream_tenants(&remote_client, &target))
} else {
@@ -642,59 +484,59 @@ pub async fn pageserver_physical_gc(
let accumulator = Arc::new(std::sync::Mutex::new(TenantRefAccumulator::default()));
// Generate a stream of TenantTimelineId
enum GcSummaryOrContent<T> {
Content(T),
GcSummary(GcSummary),
}
let timelines = tenants.map_ok(|tenant_shard_id| {
let target_ref = &target;
let remote_client_ref = &remote_client;
async move {
let summaries_from_manifests = match gc_tenant_manifests(
remote_client_ref,
min_age,
target_ref,
mode,
tenant_shard_id,
)
.await
{
Ok(gc_summary) => vec![Ok(GcSummaryOrContent::<TenantShardTimelineId>::GcSummary(
gc_summary,
))],
Err(e) => {
tracing::warn!(%tenant_shard_id, "Error in gc_tenant_manifests: {e}");
Vec::new()
}
};
stream_tenant_timelines(remote_client_ref, target_ref, tenant_shard_id)
.await
.map(|stream| {
stream
.map_ok(GcSummaryOrContent::Content)
.chain(futures::stream::iter(summaries_from_manifests.into_iter()))
})
}
});
let timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t));
let timelines = timelines.try_buffered(CONCURRENCY);
let timelines = timelines.try_flatten();
// Generate a stream of S3TimelineBlobData
async fn gc_timeline(
remote_client: &GenericRemoteStorage,
min_age: &Duration,
target: &RootTarget,
mode: GcMode,
ttid: TenantShardTimelineId,
accumulator: &Arc<std::sync::Mutex<TenantRefAccumulator>>,
) -> anyhow::Result<GcSummary> {
let mut summary = GcSummary::default();
let data = list_timeline_blobs(remote_client, ttid, target).await?;
let (index_part, latest_gen, candidates) = match &data.blob_data {
BlobDataParseResult::Parsed {
index_part,
index_part_generation,
s3_layers: _s3_layers,
} => (index_part, *index_part_generation, data.unused_index_keys),
BlobDataParseResult::Relic => {
// Post-deletion tenant location: don't try and GC it.
return Ok(summary);
}
BlobDataParseResult::Incorrect {
errors,
s3_layers: _,
} => {
// Our primary purpose isn't to report on bad data, but log this rather than skipping silently
tracing::warn!("Skipping timeline {ttid}, bad metadata: {errors:?}");
return Ok(summary);
}
};
accumulator.lock().unwrap().update(ttid, index_part);
for key in candidates {
maybe_delete_index(remote_client, min_age, latest_gen, &key, mode, &mut summary)
.instrument(info_span!("maybe_delete_index", %ttid, ?latest_gen, %key.key))
.await;
}
Ok(summary)
}
let mut summary = GcSummary::default();
// Drain futures for per-shard GC, populating accumulator as a side effect
{
let timelines = timelines.map_ok(|summary_or_ttid| match summary_or_ttid {
GcSummaryOrContent::Content(ttid) => futures::future::Either::Left(gc_timeline(
&remote_client,
&min_age,
&target,
mode,
ttid,
&accumulator,
)),
GcSummaryOrContent::GcSummary(gc_summary) => {
futures::future::Either::Right(futures::future::ok(gc_summary))
}
let timelines = timelines.map_ok(|ttid| {
gc_timeline(&remote_client, &min_age, &target, mode, ttid, &accumulator)
});
let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));

View File

@@ -266,16 +266,6 @@ class NeonBenchmarker:
name = f"{self.PROPERTY_PREFIX}_{metric_name}"
if labels is None:
labels = {}
# Sometimes mypy can't catch non-numeric values,
# so adding a check here
try:
float(metric_value)
except ValueError as e:
raise ValueError(
f"`metric_value` (`{metric_value}`) must be a NUMERIC-friendly data type"
) from e
self.property_recorder(
name,
{

View File

@@ -1095,17 +1095,6 @@ class NeonEnv:
# the pageserver taking a long time to start up due to syncfs flushing other tests' data
"no_sync": True,
}
# Batching (https://github.com/neondatabase/neon/issues/9377):
# enable batching by default in tests and benchmarks.
# Compat tests are exempt because old versions fail to parse the new config.
if not config.compatibility_neon_binpath:
ps_cfg["page_service_pipelining"] = {
"mode": "pipelined",
"execution": "concurrent-futures",
"max_batch_size": 32,
}
if self.pageserver_virtual_file_io_engine is not None:
ps_cfg["virtual_file_io_engine"] = self.pageserver_virtual_file_io_engine
if config.pageserver_default_tenant_config_compaction_algorithm is not None:

View File

@@ -116,18 +116,21 @@ def test_throughput(
# name is not a metric, we just use it to identify the test easily in the `test_...[...]`` notation
}
)
# For storing configuration as a metric, insert a fake 0 with labels with actual data
params.update({"pipelining_config": (0, {"labels": dataclasses.asdict(pipelining_config)})})
params.update(
{
f"pipelining_config.{k}": (v, {})
for k, v in dataclasses.asdict(pipelining_config).items()
}
)
log.info("params: %s", params)
for param, (value, kwargs) in params.items():
zenbenchmark.record(
param,
metric_value=float(value),
metric_value=value,
unit=kwargs.pop("unit", ""),
report=MetricReport.TEST_PARAM,
labels=kwargs.pop("labels", None),
**kwargs,
)

View File

@@ -142,9 +142,10 @@ def test_branch_creation_many(neon_compare: NeonCompare, n_branches: int, shape:
# start without gc so we can time compaction with less noise; use shorter
# period for compaction so it starts earlier
def patch_default_tenant_config(config):
tenant_config = config.setdefault("tenant_config", {})
tenant_config = config.get("tenant_config", {})
tenant_config["compaction_period"] = "3s"
tenant_config["gc_period"] = "0s"
config["tenant_config"] = tenant_config
env.pageserver.edit_config_toml(patch_default_tenant_config)
env.pageserver.start(

View File

@@ -90,7 +90,6 @@ def test_sharded_ingest(
# Start the endpoint.
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
start_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
# Ingest data and measure WAL volume and duration.
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
@@ -105,8 +104,6 @@ def test_sharded_ingest(
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
end_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
# Record metrics.
wal_written_mb = round((end_lsn - start_lsn) / (1024 * 1024))
zenbenchmark.record("wal_written", wal_written_mb, "MB", MetricReport.TEST_PARAM)
@@ -155,7 +152,3 @@ def test_sharded_ingest(
log.info(f"WAL ingested by each pageserver {ingested_by_ps}")
assert tenant_get_shards(env, tenant_id) == shards, "shards moved"
# The pageservers can take a long time to shut down gracefully, presumably due to the upload
# queue or compactions or something. Just stop them immediately, we don't care.
env.stop(immediate=True)

View File

@@ -29,8 +29,6 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder):
".*failed to load metadata.*",
".*load failed.*load local timeline.*",
".*: layer load failed, assuming permanent failure:.*",
".*failed to get checkpoint bytes.*",
".*failed get control bytes.*",
]
)
@@ -77,7 +75,7 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder):
# (We don't check layer file contents on startup, when loading the timeline)
#
# This will change when we implement checksums for layers
with pytest.raises(Exception, match="failed to get checkpoint bytes") as err:
with pytest.raises(Exception, match="get_values_reconstruct_data for layer ") as err:
pg1.start()
log.info(
f"As expected, compute startup failed for timeline {tenant1}/{timeline1} with corrupt layers: {err}"

View File

@@ -62,8 +62,9 @@ def test_min_resident_size_override_handling(
if config_level_override is not None:
def set_min_resident_size(config):
tenant_config = config.setdefault("tenant_config", {})
tenant_config = config.get("tenant_config", {})
tenant_config["min_resident_size_override"] = config_level_override
config["tenant_config"] = tenant_config
env.pageserver.edit_config_toml(set_min_resident_size)
env.pageserver.stop()

View File

@@ -33,9 +33,7 @@ def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: P
conf={
"compaction_period": f"{compaction_period}s",
"timeline_get_throttle": {
"task_kinds": [
"PageRequestHandler"
], # any non-empty array will do here https://github.com/neondatabase/neon/pull/9962
"task_kinds": ["PageRequestHandler"],
"initial": 0,
"refill_interval": "100ms",
"refill_amount": int(rate_limit_rps / 10),
@@ -118,6 +116,7 @@ def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: P
timeout=compaction_period,
)
log.info("the smgr metric includes throttle time")
smgr_query_seconds_post = ps_http.get_metric_value(smgr_metric_name, smgr_metrics_query)
assert smgr_query_seconds_post is not None
throttled_usecs_post = ps_http.get_metric_value(throttle_metric_name, throttle_metrics_query)
@@ -126,14 +125,13 @@ def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: P
actual_throttled_usecs = throttled_usecs_post - throttled_usecs_pre
actual_throttled_secs = actual_throttled_usecs / 1_000_000
log.info("validate that the metric doesn't include throttle wait time")
assert (
duration_secs >= 10 * actual_smgr_query_seconds
), "smgr metrics should not include throttle wait time"
log.info("validate that the throttling wait time metrics is correct")
pytest.approx(duration_secs, 0.1) == actual_smgr_query_seconds
), "smgr metrics include throttle wait time"
smgr_ex_throttle = actual_smgr_query_seconds - actual_throttled_secs
assert smgr_ex_throttle > 0
assert (
pytest.approx(actual_throttled_secs + actual_smgr_query_seconds, 0.1) == duration_secs
duration_secs > 10 * smgr_ex_throttle
), "most of the time in this test is spent throttled because the rate-limit's contribution to latency dominates"
@@ -183,8 +181,7 @@ def test_throttle_fair_config_is_settable_but_ignored_in_config_toml(
"""
def set_tenant_config(ps_cfg):
tenant_config = ps_cfg.setdefault("tenant_config", {})
tenant_config["timeline_get_throttle"] = throttle_config_with_field_fair_set
ps_cfg["tenant_config"] = {"timeline_get_throttle": throttle_config_with_field_fair_set}
neon_env_builder.pageserver_config_override = set_tenant_config
env = neon_env_builder.init_start()

View File

@@ -1,62 +0,0 @@
import pytest
from fixtures.neon_fixtures import NeonEnv
from fixtures.pg_version import PgVersion
#
# Test that pgstat statistic is preserved across sessions
#
def test_pgstat(neon_simple_env: NeonEnv):
env = neon_simple_env
if env.pg_version == PgVersion.V14:
pytest.skip("PG14 doesn't support pgstat statistic persistence")
env.pageserver.allowed_errors.append(".*this timeline is using deprecated aux file policy V1.*")
n = 10000
endpoint = env.endpoints.create_start("main")
con = endpoint.connect()
cur = con.cursor()
cur.execute("create table t(x integer)")
cur.execute(f"insert into t values (generate_series(1,{n}))")
cur.execute("vacuum analyze t")
cur.execute("select sum(x) from t")
cur.execute("update t set x=x+1")
cur.execute("select pg_stat_force_next_flush()")
cur.execute(
"select seq_scan,seq_tup_read,n_tup_ins,n_tup_upd,n_live_tup,n_dead_tup, vacuum_count,analyze_count from pg_stat_user_tables"
)
rec = cur.fetchall()[0]
assert rec == (2, n * 2, n, n, n * 2, n, 1, 1)
endpoint.stop()
endpoint.start()
con = endpoint.connect()
cur = con.cursor()
cur.execute(
"select seq_scan,seq_tup_read,n_tup_ins,n_tup_upd,n_live_tup,n_dead_tup, vacuum_count,analyze_count from pg_stat_user_tables"
)
rec = cur.fetchall()[0]
assert rec == (2, n * 2, n, n, n * 2, n, 1, 1)
cur.execute("update t set x=x+1")
# stop without checkpoint
endpoint.stop(mode="immediate")
endpoint.start()
con = endpoint.connect()
cur = con.cursor()
cur.execute(
"select seq_scan,seq_tup_read,n_tup_ins,n_tup_upd,n_live_tup,n_dead_tup, vacuum_count,analyze_count from pg_stat_user_tables"
)
rec = cur.fetchall()[0]
# pgstat information should be discarded in case of abnormal termination
assert rec == (0, 0, 0, 0, 0, 0, 0, 0)

View File

@@ -3057,11 +3057,7 @@ def test_shard_preferred_azs(neon_env_builder: NeonEnvBuilder):
for shard in shards:
attached_to = shard["node_attached"]
expected_az = env.get_pageserver(attached_to).az_id
# The scheduling optimization logic is not yet AZ-aware, so doesn't succeed
# in putting the tenant shards in the preferred AZ.
# To be fixed in https://github.com/neondatabase/neon/pull/9916
# assert shard["preferred_az_id"] == expected_az
assert shard["preferred_az_id"] == expected_az
@run_only_on_default_postgres("Postgres version makes no difference here")

View File

@@ -790,8 +790,6 @@ def test_timeline_retain_lsn(
[
".*initial size calculation failed: PageRead.MissingKey.could not find data for key.*",
".*page_service_conn_main.*could not find data for key.*",
".*failed to get checkpoint bytes.*",
".*failed get control bytes.*",
]
)
if offload_child is None or "no-restart" not in offload_child:
@@ -837,117 +835,3 @@ def test_timeline_retain_lsn(
with env.endpoints.create_start("test_archived_branch", tenant_id=tenant_id) as endpoint:
sum = endpoint.safe_psql("SELECT sum(key) from foo where v < 51200")
assert sum == pre_branch_sum
def test_timeline_offload_generations(neon_env_builder: NeonEnvBuilder):
"""
Test for scrubber deleting old generations of manifests
"""
remote_storage_kind = s3_storage()
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
ps_http = env.pageserver.http_client()
# Turn off gc and compaction loops: we want to issue them manually for better reliability
tenant_id, root_timeline_id = env.create_tenant(
conf={
"gc_period": "0s",
"compaction_period": "0s",
"checkpoint_distance": f"{1024 ** 2}",
}
)
# Create a branch and archive it
child_timeline_id = env.create_branch("test_archived_branch_persisted", tenant_id)
with env.endpoints.create_start(
"test_archived_branch_persisted", tenant_id=tenant_id
) as endpoint:
endpoint.safe_psql_many(
[
"CREATE TABLE foo(key serial primary key, t text default 'data_content')",
"INSERT INTO foo SELECT FROM generate_series(1,512)",
]
)
sum = endpoint.safe_psql("SELECT sum(key) from foo where key % 3 = 2")
last_flush_lsn_upload(env, endpoint, tenant_id, child_timeline_id)
assert_prefix_not_empty(
neon_env_builder.pageserver_remote_storage,
prefix=f"tenants/{str(tenant_id)}/",
)
assert_prefix_empty(
neon_env_builder.pageserver_remote_storage,
prefix=f"tenants/{str(tenant_id)}/tenant-manifest",
)
ps_http.timeline_archival_config(
tenant_id,
child_timeline_id,
state=TimelineArchivalState.ARCHIVED,
)
def timeline_offloaded_api(timeline_id: TimelineId) -> bool:
# TODO add a proper API to check if a timeline has been offloaded or not
return not any(
timeline["timeline_id"] == str(timeline_id)
for timeline in ps_http.timeline_list(tenant_id=tenant_id)
)
def child_offloaded():
ps_http.timeline_offload(tenant_id=tenant_id, timeline_id=child_timeline_id)
assert timeline_offloaded_api(child_timeline_id)
wait_until(child_offloaded)
assert timeline_offloaded_api(child_timeline_id)
assert not timeline_offloaded_api(root_timeline_id)
# Reboot the pageserver a bunch of times, do unoffloads, offloads
for i in range(5):
env.pageserver.stop()
env.pageserver.start()
assert timeline_offloaded_api(child_timeline_id)
assert not timeline_offloaded_api(root_timeline_id)
ps_http.timeline_archival_config(
tenant_id,
child_timeline_id,
state=TimelineArchivalState.UNARCHIVED,
)
assert not timeline_offloaded_api(child_timeline_id)
if i % 2 == 0:
with env.endpoints.create_start(
"test_archived_branch_persisted", tenant_id=tenant_id
) as endpoint:
sum_again = endpoint.safe_psql("SELECT sum(key) from foo where key % 3 = 2")
assert sum == sum_again
ps_http.timeline_archival_config(
tenant_id,
child_timeline_id,
state=TimelineArchivalState.ARCHIVED,
)
wait_until(child_offloaded)
#
# Now ensure that scrubber runs will clean up old generations' manifests.
#
# Sleep some amount larger than min_age_secs
time.sleep(3)
# Ensure that min_age_secs has a deletion impeding effect
gc_summary = env.storage_scrubber.pageserver_physical_gc(min_age_secs=3600, mode="full")
assert gc_summary["remote_storage_errors"] == 0
assert gc_summary["indices_deleted"] == 0
assert gc_summary["tenant_manifests_deleted"] == 0
gc_summary = env.storage_scrubber.pageserver_physical_gc(min_age_secs=1, mode="full")
assert gc_summary["remote_storage_errors"] == 0
assert gc_summary["indices_deleted"] > 0
assert gc_summary["tenant_manifests_deleted"] > 0

View File

@@ -1,15 +1,15 @@
{
"v17": [
"17.2",
"7864df7b68fa7e0a7b0234b2e5dd2cdb7772aa08"
"a10d95be67265e0f10a422ba0457f5a7af01de71"
],
"v16": [
"16.6",
"b6b298e88848f0dbb7d4a077fe70bcd4573ee7ca"
"dff6615a8e48a10bb17a03fa3c00635f1ace7a92"
],
"v15": [
"15.10",
"b352942e9c08e5a5350f5c1662c118ce96ea11c5"
"972e325e62b455957adbbdd8580e31275bb5b8c9"
],
"v14": [
"14.15",