Compare commits

..

7 Commits

Author SHA1 Message Date
Heikki Linnakangas
394e675db3 Print checkpoint key contents "pagectl print-layer-file" 2024-01-27 14:49:59 +02:00
Heikki Linnakangas
507c4bd7b4 Fix the test
Now it passes on my laptop at least
2024-01-20 19:59:56 +02:00
Heikki Linnakangas
caa3a42cdb Apply the hack for all timelines of the target tenant
This gives us more flexibility to try it on a branch first
2024-01-20 19:26:53 +02:00
Heikki Linnakangas
be8a6bcdb4 Fix the test 2024-01-20 19:04:27 +02:00
Heikki Linnakangas
71c7ba756d Fix typos and formatting in test, per 'ruff' 2024-01-20 18:25:04 +02:00
Heikki Linnakangas
47f898710c Fix formatting 2024-01-20 18:24:49 +02:00
Heikki Linnakangas
cafd4b52ca Retroactively fix the nextXid on a known broken timeline
This one particular timeline in production hit the nextXid bug. Add a
one-off hack that will fix the nextXid on that particular timeline.
2024-01-20 18:24:49 +02:00
18 changed files with 388 additions and 376 deletions

4
Cargo.lock generated
View File

@@ -5031,9 +5031,9 @@ dependencies = [
[[package]]
name = "shlex"
version = "1.3.0"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3"
[[package]]
name = "signal-hook"

View File

@@ -143,8 +143,6 @@ RUN wget https://github.com/pgRouting/pgrouting/archive/v3.4.2.tar.gz -O pgrouti
#########################################################################################
FROM build-deps AS plv8-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ARG PG_VERSION
RUN apt update && \
apt install -y ninja-build python3-dev libncurses5 binutils clang
@@ -619,7 +617,6 @@ RUN wget https://github.com/theory/pg-semver/archive/refs/tags/v0.32.1.tar.gz -O
FROM build-deps AS pg-embedding-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ARG PG_VERSION
ENV PATH "/usr/local/pgsql/bin/:$PATH"
RUN case "${PG_VERSION}" in \
"v14" | "v15") \
@@ -782,8 +779,6 @@ RUN wget https://github.com/eulerto/wal2json/archive/refs/tags/wal2json_2_5.tar.
#
#########################################################################################
FROM build-deps AS neon-pg-ext-build
ARG PG_VERSION
# Public extensions
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=postgis-build /sfcgal/* /

View File

@@ -329,8 +329,8 @@ impl CheckPoint {
///
/// Returns 'true' if the XID was updated.
pub fn update_next_xid(&mut self, xid: u32) -> bool {
// nextXid should be greater than any XID in WAL, so increment provided XID and check for wraparround.
let mut new_xid = std::cmp::max(xid.wrapping_add(1), pg_constants::FIRST_NORMAL_TRANSACTION_ID);
// nextXid should nw greater than any XID in WAL, so increment provided XID and check for wraparround.
let mut new_xid = std::cmp::max(xid + 1, pg_constants::FIRST_NORMAL_TRANSACTION_ID);
// To reduce number of metadata checkpoints, we forward align XID on XID_CHECKPOINT_INTERVAL.
// XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
new_xid =

View File

@@ -1832,7 +1832,7 @@ const CONTROLFILE_KEY: Key = Key {
field6: 0,
};
const CHECKPOINT_KEY: Key = Key {
pub const CHECKPOINT_KEY: Key = Key {
field1: 0x03,
field2: 0,
field3: 0,

View File

@@ -18,16 +18,13 @@ use enumset::EnumSet;
use futures::stream::FuturesUnordered;
use futures::FutureExt;
use futures::StreamExt;
use nix::unistd::Pid;
use pageserver_api::models;
use pageserver_api::models::TimelineState;
use pageserver_api::shard::ShardIdentity;
use pageserver_api::shard::TenantShardId;
use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use tokio::signal::unix::Signal;
use std::fmt;
use std::os::unix::process::CommandExt;
use storage_broker::BrokerClientChannel;
use tokio::io::BufReader;
use tokio::runtime::Handle;
@@ -3749,17 +3746,7 @@ async fn run_initdb(
let _permit = INIT_DB_SEMAPHORE.acquire().await;
let mut initdb_command_std = std::process::Command::new(&initdb_bin_path);
// The process_group function is unstable as tokio's MSRV is 1.63,
// and process_group was stabilized in 1.64. This is the officially
// recommended workaround.
// Setting pgroup to 0 makes the pgroupid be that of the child, as explained in
// https://github.com/microsoft/WSL/issues/2997 (unrelated bug, but explains it)
// We use need the pgid to be set for pkill to work during cancellation, to also
// get the child processes of initdb.
initdb_command_std.process_group(0);
let mut initdb_command = tokio::process::Command::from(initdb_command_std)
let initdb_command = tokio::process::Command::new(&initdb_bin_path)
.args(["-D", initdb_target_dir.as_ref()])
.args(["-U", &conf.superuser])
.args(["-E", "utf8"])
@@ -3780,25 +3767,13 @@ async fn run_initdb(
.spawn()?;
tokio::select! {
exit_status = initdb_command.wait() => {
let exit_status = exit_status?;
if !exit_status.success() {
let mut stderr = initdb_command.stderr.take().unwrap();
let mut stderr_vec = Vec::new();
tokio::io::copy(&mut stderr, &mut stderr_vec).await?;
return Err(InitdbError::Failed(exit_status, stderr_vec));
initdb_output = initdb_command.wait_with_output() => {
let initdb_output = initdb_output?;
if !initdb_output.status.success() {
return Err(InitdbError::Failed(initdb_output.status, initdb_output.stderr));
}
}
_ = cancel.cancelled() => {
if let Some(pid) = initdb_command.id() {
warn!("Doing killpg...");
nix::sys::signal::killpg(Pid::from_raw(pid as i32), Signal::SIGKILL)
.map_err(|e| InitdbError::Other(anyhow::anyhow!(e)))?;
initdb_command.wait().await?;
} else {
warn!("Couldn't obtain initdb pid, killing initdb process only.");
initdb_command.kill().await?;
}
return Err(InitdbError::Cancelled);
}
}

View File

@@ -884,7 +884,7 @@ impl DeltaLayerInner {
let keys = self.load_keys(ctx).await?;
async fn dump_blob(val: ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
let val = Value::des(&buf)?;
let desc = match val {
@@ -905,14 +905,30 @@ impl DeltaLayerInner {
}
for entry in keys {
let DeltaEntry { key, lsn, val, .. } = entry;
let desc = match dump_blob(val, ctx).await {
let DeltaEntry { key, lsn, val, .. } = entry;
let desc = match dump_blob(&val, ctx).await {
Ok(desc) => desc,
Err(err) => {
format!("ERROR: {err}")
}
};
println!(" key {key} at {lsn}: {desc}");
use crate::pgdatadir_mapping::CHECKPOINT_KEY;
use postgres_ffi::CheckPoint;
if key == CHECKPOINT_KEY
{
let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
let val = Value::des(&buf)?;
match val {
Value::Image(img) => {
let checkpoint = CheckPoint::decode(&img)?;
println!(" CHECKPOINT: {:?}", checkpoint);
}
Value::WalRecord(_rec) => {
format!(" unexpected walrecord for checkpoint key");
}
}
}
}
Ok(())

View File

@@ -26,8 +26,11 @@ use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
use std::str::FromStr;
use anyhow::{bail, Context, Result};
use bytes::{Buf, Bytes, BytesMut};
use hex::FromHex;
use tracing::*;
use utils::failpoint_support;
@@ -43,9 +46,10 @@ use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
use postgres_ffi::v14::xlog_utils::*;
use postgres_ffi::v14::CheckPoint;
use postgres_ffi::v14::{bindings::FullTransactionId, CheckPoint};
use postgres_ffi::TransactionId;
use postgres_ffi::BLCKSZ;
use utils::id::TenantId;
use utils::lsn::Lsn;
pub struct WalIngest {
@@ -108,6 +112,55 @@ impl WalIngest {
self.checkpoint_modified = true;
}
// BEGIN ONE-OFF HACK
//
// We had a bug where we incorrectly passed 0 to update_next_xid(). That was
// harmless as long as nextXid was < 2^31, because 0 looked like a very old
// XID. But once nextXid reaches 2^31, 0 starts to look like a very new XID, and
// we incorrectly bumped up nextXid to the next epoch, to value '1:1024'
//
// We have one known timeline in production where that happened. This is a one-off
// fix to fix that damage. The last WAL record on that timeline as of this writing
// is this:
//
// rmgr: Standby len (rec/tot): 50/ 50, tx: 0, lsn: 35A/E32D86D8, prev 35A/E32D86B0, desc: RUNNING_XACTS nextXid 2325447052 latestCompletedXid 2325447051 oldestRunningXid 2325447052
//
// So on that particular timeline, before that LSN, fix the incorrectly set
// nextXid to the nextXid value from that record, plus 1000 to give some safety
// margin.
// For testing this hack, this failpoint temporarily re-introduces the bug that
// was fixed
fn reintroduce_bug_failpoint_activated() -> bool {
fail::fail_point!("reintroduce-nextxid-update-bug", |_| { true });
false
}
if decoded.xl_xid == pg_constants::INVALID_TRANSACTION_ID
&& reintroduce_bug_failpoint_activated()
&& self.checkpoint.update_next_xid(decoded.xl_xid)
{
info!(
"failpoint: Incorrectly updated nextXid at LSN {} to {}",
lsn, self.checkpoint.nextXid.value
);
self.checkpoint_modified = true;
}
if self.checkpoint.nextXid.value == 4294968320 && // 1::1024, the incorrect value
modification.tline.tenant_shard_id.tenant_id == TenantId::from_hex("df254570a4f603805528b46b0d45a76c").unwrap() &&
lsn < Lsn::from_str("35A/E32D9000").unwrap() &&
!reintroduce_bug_failpoint_activated()
{
// This is the last nextXid value from the last RUNNING_XACTS record, at the
// end of the WAL as of this writing.
self.checkpoint.nextXid = FullTransactionId {
value: 2325447052 + 1000,
};
self.checkpoint_modified = true;
warn!("nextXid fixed by one-off hack at LSN {}", lsn);
}
// END ONE-OFF HACK
match decoded.xl_rmid {
pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
// Heap AM records need some special handling, because they modify VM pages

View File

@@ -5,7 +5,7 @@ edition.workspace = true
license.workspace = true
[features]
default = []
default = ["testing"]
testing = []
[dependencies]

View File

@@ -10,7 +10,6 @@ use crate::auth::credentials::check_peer_addr_is_in_list;
use crate::auth::validate_password_and_exchange;
use crate::cache::Cached;
use crate::console::errors::GetAuthInfoError;
use crate::console::provider::ConsoleBackend;
use crate::console::AuthSecret;
use crate::context::RequestMonitoring;
use crate::proxy::connect_compute::handle_try_wake;
@@ -44,8 +43,11 @@ use tracing::{error, info, warn};
/// this helps us provide the credentials only to those auth
/// backends which require them for the authentication process.
pub enum BackendType<'a, T> {
/// Cloud API (V2).
Console(Cow<'a, ConsoleBackend>, T),
/// Current Cloud API (V2).
Console(Cow<'a, console::provider::neon::Api>, T),
/// Local mock of Cloud API (V2).
#[cfg(feature = "testing")]
Postgres(Cow<'a, console::provider::mock::Api>, T),
/// Authentication via a web browser.
Link(Cow<'a, url::ApiUrl>),
#[cfg(test)]
@@ -62,15 +64,9 @@ impl std::fmt::Display for BackendType<'_, ()> {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use BackendType::*;
match self {
Console(api, _) => match &**api {
ConsoleBackend::Console(endpoint) => {
fmt.debug_tuple("Console").field(&endpoint.url()).finish()
}
#[cfg(feature = "testing")]
ConsoleBackend::Postgres(endpoint) => {
fmt.debug_tuple("Postgres").field(&endpoint.url()).finish()
}
},
Console(endpoint, _) => fmt.debug_tuple("Console").field(&endpoint.url()).finish(),
#[cfg(feature = "testing")]
Postgres(endpoint, _) => fmt.debug_tuple("Postgres").field(&endpoint.url()).finish(),
Link(url) => fmt.debug_tuple("Link").field(&url.as_str()).finish(),
#[cfg(test)]
Test(_) => fmt.debug_tuple("Test").finish(),
@@ -85,6 +81,8 @@ impl<T> BackendType<'_, T> {
use BackendType::*;
match self {
Console(c, x) => Console(Cow::Borrowed(c), x),
#[cfg(feature = "testing")]
Postgres(c, x) => Postgres(Cow::Borrowed(c), x),
Link(c) => Link(Cow::Borrowed(c)),
#[cfg(test)]
Test(x) => Test(*x),
@@ -100,6 +98,8 @@ impl<'a, T> BackendType<'a, T> {
use BackendType::*;
match self {
Console(c, x) => Console(c, f(x)),
#[cfg(feature = "testing")]
Postgres(c, x) => Postgres(c, f(x)),
Link(c) => Link(c),
#[cfg(test)]
Test(x) => Test(x),
@@ -114,6 +114,8 @@ impl<'a, T, E> BackendType<'a, Result<T, E>> {
use BackendType::*;
match self {
Console(c, x) => x.map(|x| Console(c, x)),
#[cfg(feature = "testing")]
Postgres(c, x) => x.map(|x| Postgres(c, x)),
Link(c) => Ok(Link(c)),
#[cfg(test)]
Test(x) => Ok(Test(x)),
@@ -323,6 +325,8 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint> {
match self {
Console(_, user_info) => user_info.project.clone(),
#[cfg(feature = "testing")]
Postgres(_, user_info) => user_info.project.clone(),
Link(_) => Some("link".into()),
#[cfg(test)]
Test(_) => Some("test".into()),
@@ -335,6 +339,8 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint> {
match self {
Console(_, user_info) => &user_info.user,
#[cfg(feature = "testing")]
Postgres(_, user_info) => &user_info.user,
Link(_) => "link",
#[cfg(test)]
Test(_) => "test",
@@ -365,6 +371,19 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint> {
.await?;
(cache_info, BackendType::Console(api, user_info))
}
#[cfg(feature = "testing")]
Postgres(api, user_info) => {
info!(
user = &*user_info.user,
project = user_info.project(),
"performing authentication using a local postgres instance"
);
let (cache_info, user_info) =
auth_and_wake_compute(ctx, &*api, user_info, client, allow_cleartext, config)
.await?;
(cache_info, BackendType::Postgres(api, user_info))
}
// NOTE: this auth backend doesn't use client credentials.
Link(url) => {
info!("performing link authentication");
@@ -395,6 +414,8 @@ impl BackendType<'_, ComputeUserInfo> {
use BackendType::*;
match self {
Console(api, user_info) => api.get_allowed_ips(ctx, user_info).await,
#[cfg(feature = "testing")]
Postgres(api, user_info) => api.get_allowed_ips(ctx, user_info).await,
Link(_) => Ok(Cached::new_uncached(Arc::new(vec![]))),
#[cfg(test)]
Test(x) => Ok(Cached::new_uncached(Arc::new(x.get_allowed_ips()?))),
@@ -411,6 +432,8 @@ impl BackendType<'_, ComputeUserInfo> {
match self {
Console(api, user_info) => api.wake_compute(ctx, user_info).map_ok(Some).await,
#[cfg(feature = "testing")]
Postgres(api, user_info) => api.wake_compute(ctx, user_info).map_ok(Some).await,
Link(_) => Ok(None),
#[cfg(test)]
Test(x) => x.wake_compute().map(Some),

View File

@@ -57,31 +57,24 @@ pub(super) async fn authenticate(
link_uri: &reqwest::Url,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
) -> auth::Result<NodeInfo> {
// registering waiter can fail if we get unlucky with rng.
// just try again.
let (psql_session_id, waiter) = loop {
let psql_session_id = new_psql_session_id();
match console::mgmt::get_waiter(&psql_session_id) {
Ok(waiter) => break (psql_session_id, waiter),
Err(_e) => continue,
}
};
let psql_session_id = new_psql_session_id();
let span = info_span!("link", psql_session_id = &psql_session_id);
let greeting = hello_message(link_uri, &psql_session_id);
// Give user a URL to spawn a new database.
info!(parent: &span, "sending the auth URL to the user");
client
.write_message_noflush(&Be::AuthenticationOk)?
.write_message_noflush(&Be::CLIENT_ENCODING)?
.write_message(&Be::NoticeResponse(&greeting))
.await?;
let db_info = console::mgmt::with_waiter(psql_session_id, |waiter| async {
// Give user a URL to spawn a new database.
info!(parent: &span, "sending the auth URL to the user");
client
.write_message_noflush(&Be::AuthenticationOk)?
.write_message_noflush(&Be::CLIENT_ENCODING)?
.write_message(&Be::NoticeResponse(&greeting))
.await?;
// Wait for web console response (see `mgmt`).
info!(parent: &span, "waiting for console's reply...");
let db_info = waiter.await.map_err(LinkAuthError::from)?;
// Wait for web console response (see `mgmt`).
info!(parent: &span, "waiting for console's reply...");
waiter.await?.map_err(LinkAuthError::AuthFailed)
})
.await?;
client.write_message_noflush(&Be::NoticeResponse("Connecting to database."))?;

View File

@@ -249,19 +249,12 @@ async fn main() -> anyhow::Result<()> {
}
if let auth::BackendType::Console(api, _) = &config.auth_backend {
match &**api {
proxy::console::provider::ConsoleBackend::Console(api) => {
let cache = api.caches.project_info.clone();
if let Some(url) = args.redis_notifications {
info!("Starting redis notifications listener ({url})");
maintenance_tasks
.spawn(notifications::task_main(url.to_owned(), cache.clone()));
}
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
}
#[cfg(feature = "testing")]
proxy::console::provider::ConsoleBackend::Postgres(_) => {}
let cache = api.caches.project_info.clone();
if let Some(url) = args.redis_notifications {
info!("Starting redis notifications listener ({url})");
maintenance_tasks.spawn(notifications::task_main(url.to_owned(), cache.clone()));
}
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
}
let maintenance = loop {
@@ -358,15 +351,13 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
let endpoint = http::Endpoint::new(url, http::new_client(rate_limiter_config));
let api = console::provider::neon::Api::new(endpoint, caches, locks);
let api = console::provider::ConsoleBackend::Console(api);
auth::BackendType::Console(Cow::Owned(api), ())
}
#[cfg(feature = "testing")]
AuthBackend::Postgres => {
let url = args.auth_endpoint.parse()?;
let api = console::provider::mock::Api::new(url);
let api = console::provider::ConsoleBackend::Postgres(api);
auth::BackendType::Console(Cow::Owned(api), ())
auth::BackendType::Postgres(Cow::Owned(api), ())
}
AuthBackend::Link => {
let url = args.uri.parse()?;

View File

@@ -266,7 +266,7 @@ impl ProjectInfoCacheImpl {
tokio::time::interval(self.config.gc_interval / (self.cache.shards().len()) as u32);
loop {
interval.tick().await;
if self.cache.len() < self.config.size {
if self.cache.len() <= self.config.size {
// If there are not too many entries, wait until the next gc cycle.
continue;
}

View File

@@ -13,10 +13,16 @@ use tracing::{error, info, info_span, Instrument};
static CPLANE_WAITERS: Lazy<Waiters<ComputeReady>> = Lazy::new(Default::default);
/// Give caller an opportunity to wait for the cloud's reply.
pub fn get_waiter(
pub async fn with_waiter<R, T, E>(
psql_session_id: impl Into<String>,
) -> Result<Waiter<'static, ComputeReady>, waiters::RegisterError> {
CPLANE_WAITERS.register(psql_session_id.into())
action: impl FnOnce(Waiter<'static, ComputeReady>) -> R,
) -> Result<T, E>
where
R: std::future::Future<Output = Result<T, E>>,
E: From<waiters::RegisterError>,
{
let waiter = CPLANE_WAITERS.register(psql_session_id.into())?;
action(waiter).await
}
pub fn notify(psql_session_id: &str, msg: ComputeReady) -> Result<(), waiters::NotifyError> {
@@ -71,7 +77,7 @@ async fn handle_connection(socket: TcpStream) -> Result<(), QueryError> {
}
/// A message received by `mgmt` when a compute node is ready.
pub type ComputeReady = DatabaseInfo;
pub type ComputeReady = Result<DatabaseInfo, String>;
// TODO: replace with an http-based protocol.
struct MgmtHandler;
@@ -96,7 +102,7 @@ fn try_process_query(pgb: &mut PostgresBackendTCP, query: &str) -> Result<(), Qu
let _enter = span.enter();
info!("got response: {:?}", resp.result);
match notify(resp.session_id, resp.result) {
match notify(resp.session_id, Ok(resp.result)) {
Ok(()) => {
pgb.write_message_noflush(&SINGLE_COL_ROWDESC)?
.write_message_noflush(&BeMessage::DataRow(&[Some(b"ok")]))?

View File

@@ -248,75 +248,23 @@ pub trait Api {
async fn get_role_secret(
&self,
ctx: &mut RequestMonitoring,
user_info: &ComputeUserInfo,
creds: &ComputeUserInfo,
) -> Result<Option<CachedRoleSecret>, errors::GetAuthInfoError>;
async fn get_allowed_ips(
&self,
ctx: &mut RequestMonitoring,
user_info: &ComputeUserInfo,
creds: &ComputeUserInfo,
) -> Result<CachedAllowedIps, errors::GetAuthInfoError>;
/// Wake up the compute node and return the corresponding connection info.
async fn wake_compute(
&self,
ctx: &mut RequestMonitoring,
user_info: &ComputeUserInfo,
creds: &ComputeUserInfo,
) -> Result<CachedNodeInfo, errors::WakeComputeError>;
}
#[derive(Clone)]
pub enum ConsoleBackend {
/// Current Cloud API (V2).
Console(neon::Api),
/// Local mock of Cloud API (V2).
#[cfg(feature = "testing")]
Postgres(mock::Api),
}
#[async_trait]
impl Api for ConsoleBackend {
async fn get_role_secret(
&self,
ctx: &mut RequestMonitoring,
user_info: &ComputeUserInfo,
) -> Result<Option<CachedRoleSecret>, errors::GetAuthInfoError> {
use ConsoleBackend::*;
match self {
Console(api) => api.get_role_secret(ctx, user_info).await,
#[cfg(feature = "testing")]
Postgres(api) => api.get_role_secret(ctx, user_info).await,
}
}
async fn get_allowed_ips(
&self,
ctx: &mut RequestMonitoring,
user_info: &ComputeUserInfo,
) -> Result<CachedAllowedIps, errors::GetAuthInfoError> {
use ConsoleBackend::*;
match self {
Console(api) => api.get_allowed_ips(ctx, user_info).await,
#[cfg(feature = "testing")]
Postgres(api) => api.get_allowed_ips(ctx, user_info).await,
}
}
async fn wake_compute(
&self,
ctx: &mut RequestMonitoring,
user_info: &ComputeUserInfo,
) -> Result<CachedNodeInfo, errors::WakeComputeError> {
use ConsoleBackend::*;
match self {
Console(api) => api.wake_compute(ctx, user_info).await,
#[cfg(feature = "testing")]
Postgres(api) => api.wake_compute(ctx, user_info).await,
}
}
}
/// Various caches for [`console`](super).
pub struct ApiCaches {
/// Cache for the `wake_compute` API method.

View File

@@ -160,6 +160,8 @@ where
let node_info = loop {
let wake_res = match user_info {
auth::BackendType::Console(api, user_info) => api.wake_compute(ctx, user_info).await,
#[cfg(feature = "testing")]
auth::BackendType::Postgres(api, user_info) => api.wake_compute(ctx, user_info).await,
// nothing to do?
auth::BackendType::Link(_) => return Err(err.into()),
// test backend

View File

@@ -2998,6 +2998,23 @@ class Endpoint(PgProtocol):
):
self.stop()
def log_contains(self, pattern: str) -> Optional[str]:
"""Check that the compute log contains a line that matches the given regex"""
logfile = self.endpoint_path() / "compute.log"
if not logfile.exists():
log.warning(f"Skipping log check: {logfile} does not exist")
return None
contains_re = re.compile(pattern)
with logfile.open("r") as f:
for line in f:
if contains_re.search(line):
# found it!
return line
return None
# Checkpoints running endpoint and returns pg_wal size in MB.
def get_pg_wal_size(self):
log.info(f'checkpointing at LSN {self.safe_psql("select pg_current_wal_lsn()")[0][0]}')

View File

@@ -3,10 +3,12 @@ import os
import time
from pathlib import Path
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, wait_for_wal_insert_lsn
from fixtures.pageserver.utils import (
wait_for_last_record_lsn,
wait_for_upload,
)
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import Lsn, TenantId, TimelineId
@@ -98,7 +100,7 @@ def test_import_at_2bil(
vanilla_pg.safe_psql("CREATE TABLE t (t text);")
vanilla_pg.safe_psql("INSERT INTO t VALUES ('inserted in vanilla')")
endpoint_id = "ep-import_from_vanilla"
branch_name = "import_from_vanilla"
tenant = TenantId.generate()
timeline = TimelineId.generate()
@@ -138,7 +140,7 @@ def test_import_at_2bil(
"--timeline-id",
str(timeline),
"--node-name",
endpoint_id,
branch_name,
"--base-lsn",
start_lsn,
"--base-tarfile",
@@ -157,7 +159,8 @@ def test_import_at_2bil(
wait_for_last_record_lsn(ps_http, tenant, timeline, Lsn(end_lsn))
endpoint = env.endpoints.create_start(
endpoint_id,
branch_name,
endpoint_id="ep-import_from_vanilla",
tenant_id=tenant,
config_lines=[
"log_autovacuum_min_duration = 0",
@@ -166,7 +169,6 @@ def test_import_at_2bil(
)
assert endpoint.safe_psql("select count(*) from t") == [(1,)]
# Ok, consume
conn = endpoint.connect()
cur = conn.cursor()
@@ -218,3 +220,212 @@ def test_import_at_2bil(
cur = conn.cursor()
cur.execute("SELECT count(*) from t")
assert cur.fetchone() == (10000 + 1,)
# This is a followup to the test_import_at_2bil test.
#
# Use a failpoint to reintroduce the bug that test_import_at_2bil also
# tests. Then, after the damage has been done, clear the failpoint to
# fix the bug. Check that the one-off hack that we added for a particular
# timeline that hit this in production fixes the broken timeline.
def test_one_off_hack_for_nextxid_bug(
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
pg_distrib_dir: Path,
pg_bin,
vanilla_pg,
):
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
ps_http = env.pageserver.http_client()
env.pageserver.allowed_errors.append(".*nextXid fixed by one-off hack.*")
# We begin with the old bug still present, to create a broken timeline
ps_http.configure_failpoints(("reintroduce-nextxid-update-bug", "return(true)"))
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.
psql_env = {"LD_LIBRARY_PATH": str(pg_distrib_dir / "lib")}
# Reset the vanilla Postgres instance to somewhat before 2 billion transactions,
# and around the same LSN as with the production timeline.
pg_resetwal_path = os.path.join(pg_bin.pg_bin_path, "pg_resetwal")
cmd = [
pg_resetwal_path,
"--next-transaction-id=2129920000",
"-l",
"000000010000035A000000E0",
"-D",
str(vanilla_pg.pgdatadir),
]
pg_bin.run_capture(cmd, env=psql_env)
vanilla_pg.start()
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
vanilla_pg.safe_psql(
"""create table tt as select 'long string to consume some space' || g
from generate_series(1,300000) g"""
)
assert vanilla_pg.safe_psql("select count(*) from tt") == [(300000,)]
vanilla_pg.safe_psql("CREATE TABLE t (t text);")
vanilla_pg.safe_psql("INSERT INTO t VALUES ('inserted in vanilla')")
branch_name = "import_from_vanilla"
# This is the tenant/timeline that the one-off hack targets
tenant = "df254570a4f603805528b46b0d45a76c"
timeline = TimelineId.generate()
env.pageserver.tenant_create(tenant)
# Take basebackup
basebackup_dir = os.path.join(test_output_dir, "basebackup")
base_tar = os.path.join(basebackup_dir, "base.tar")
wal_tar = os.path.join(basebackup_dir, "pg_wal.tar")
os.mkdir(basebackup_dir)
vanilla_pg.safe_psql("CHECKPOINT")
pg_bin.run(
[
"pg_basebackup",
"-F",
"tar",
"-d",
vanilla_pg.connstr(),
"-D",
basebackup_dir,
]
)
# Get start_lsn and end_lsn
with open(os.path.join(basebackup_dir, "backup_manifest")) as f:
manifest = json.load(f)
start_lsn = manifest["WAL-Ranges"][0]["Start-LSN"]
end_lsn = manifest["WAL-Ranges"][0]["End-LSN"]
def import_tar(base, wal):
env.neon_cli.raw_cli(
[
"timeline",
"import",
"--tenant-id",
str(tenant),
"--timeline-id",
str(timeline),
"--node-name",
branch_name,
"--base-lsn",
start_lsn,
"--base-tarfile",
base,
"--end-lsn",
end_lsn,
"--wal-tarfile",
wal,
"--pg-version",
env.pg_version,
]
)
# Importing correct backup works
import_tar(base_tar, wal_tar)
wait_for_last_record_lsn(ps_http, tenant, timeline, Lsn(end_lsn))
endpoint = env.endpoints.create_start(
branch_name,
endpoint_id="ep-import_from_vanilla",
tenant_id=tenant,
config_lines=[
"log_autovacuum_min_duration = 0",
"autovacuum_naptime='5 s'",
],
)
assert endpoint.safe_psql("select count(*) from t") == [(1,)]
conn = endpoint.connect()
cur = conn.cursor()
# Install extension containing function needed for test
cur.execute("CREATE EXTENSION neon_test_utils")
# Advance nextXid to the target XID, which is somewhat above the 2
# billion mark.
while True:
xid = int(query_scalar(cur, "SELECT txid_current()"))
log.info(f"xid now {xid}")
# Consume 10k transactons at a time until we get to 2^31 - 200k
if xid < (2325447052 - 100000):
cur.execute("select test_consume_xids(50000);")
elif xid < 2325447052 - 10000:
cur.execute("select test_consume_xids(5000);")
else:
break
# Run a bunch of real INSERTs to cross over the 2 billion mark
# Use a begin-exception block to have a separate sub-XID for each insert.
cur.execute(
"""
do $$
begin
for i in 1..10000 loop
-- Use a begin-exception block to generate a new subtransaction on each iteration
begin
insert into t values (i);
exception when others then
raise 'not expected %', sqlerrm;
end;
end loop;
end;
$$;
"""
)
# A checkpoint writes a WAL record with xl_xid=0. Many other WAL
# records would have the same effect.
cur.execute("checkpoint")
# Ok, the nextXid in the pageserver at this LSN should now be incorrectly
# set to 1:1024. Remember this LSN.
broken_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_insert_lsn()"))
# Ensure that the broken checkpoint data has reached permanent storage
ps_http.timeline_checkpoint(tenant, timeline)
wait_for_upload(ps_http, tenant, timeline, broken_lsn)
# Now fix the bug, and generate some WAL with XIDs
ps_http.configure_failpoints(("reintroduce-nextxid-update-bug", "off"))
cur.execute("INSERT INTO t VALUES ('after fix')")
fixed_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_insert_lsn()"))
log.info(f"nextXid was broken by {broken_lsn}, and fixed again by {fixed_lsn}")
# Stop the original endpoint, we don't need it anymore.
endpoint.stop()
# Test that we cannot start a new endpoint at the broken LSN.
env.neon_cli.create_branch(
"at-broken-lsn", branch_name, ancestor_start_lsn=broken_lsn, tenant_id=tenant
)
endpoint_broken = env.endpoints.create(
"at-broken-lsn",
endpoint_id="ep-at-broken-lsn",
tenant_id=tenant,
)
with pytest.raises(RuntimeError, match="Postgres exited unexpectedly with code 1"):
endpoint_broken.start()
assert endpoint_broken.log_contains(
'Could not open file "pg_xact/0000": No such file or directory'
)
# But after the bug was fixed, the one-off hack fixed the timeline,
# and a later LSN works.
env.neon_cli.create_branch(
"at-fixed-lsn", branch_name, ancestor_start_lsn=fixed_lsn, tenant_id=tenant
)
endpoint_fixed = env.endpoints.create_start(
"at-fixed-lsn", endpoint_id="ep-at-fixed-lsn", tenant_id=tenant
)
conn = endpoint_fixed.connect()
cur = conn.cursor()
cur.execute("SELECT count(*) from t")
# One "inserted in vanilla" row, 10000 in the DO-loop, and one "after fix" row
assert cur.fetchone() == (1 + 10000 + 1,)

View File

@@ -556,216 +556,6 @@ def test_tenant_delete_concurrent(
assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 0
def test_tenant_delete_races_timeline_creation_01(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_02(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_03(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_04(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_05(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_06(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_07(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_08(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_09(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_10(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_11(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_12(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_13(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_14(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_15(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_16(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_17(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_18(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_19(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_20(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_21(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_22(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_23(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_24(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_25(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_26(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_27(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_28(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_29(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation_30(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
test_tenant_delete_races_timeline_creation(neon_env_builder, pg_bin)
def test_tenant_delete_races_timeline_creation(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
@@ -788,9 +578,6 @@ def test_tenant_delete_races_timeline_creation(
".*POST.*Cancelled request finished with an error: InternalServerError\\(.*ancelled"
)
# This can occur sometimes.
CONFLICT_MESSAGE = ".*Precondition failed: Invalid state Stopping. Expected Active or Broken.*"
env.pageserver.allowed_errors.extend(
[
# lucky race with stopping from flushing a layer we fail to schedule any uploads
@@ -799,9 +586,6 @@ def test_tenant_delete_races_timeline_creation(
".*POST.*/timeline.* request was dropped before completing",
# Timeline creation runs into this error
CANCELLED_ERROR,
# Timeline deletion can run into this error during deletion
CONFLICT_MESSAGE,
".*tenant_delete_handler.*still waiting, taking longer than expected.*",
]
)
@@ -859,8 +643,6 @@ def test_tenant_delete_races_timeline_creation(
except PageserverApiException:
pass
os.wait(4)
# Physical deletion should have happened
assert_prefix_empty(
neon_env_builder.pageserver_remote_storage,