Compare commits

..

1 Commits

Author SHA1 Message Date
Christian Schwarz
4d7b504c70 WIP: move PageServerConf methods that return workdir subdirs into own type
I thought this might be handy in https://github.com/neondatabase/neon/pull/5536
because initially I planned to download directly into local filesystem.
I switched to using `remote_storage` instead in that PR.

But these changes here might still be useful.
2023-10-11 18:14:12 +02:00
23 changed files with 307 additions and 615 deletions

View File

@@ -224,8 +224,8 @@ RUN wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.7.tar.gz -
FROM build-deps AS vector-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.5.1.tar.gz -O pgvector.tar.gz && \
echo "cc7a8e034a96e30a819911ac79d32f6bc47bdd1aa2de4d7d4904e26b83209dc8 pgvector.tar.gz" | sha256sum --check && \
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.5.0.tar.gz -O pgvector.tar.gz && \
echo "d8aa3504b215467ca528525a6de12c3f85f9891b091ce0e5864dd8a9b757f77b pgvector.tar.gz" | sha256sum --check && \
mkdir pgvector-src && cd pgvector-src && tar xvzf ../pgvector.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \

View File

@@ -4,7 +4,7 @@
//! allowing multiple api users to independently work with the same S3 bucket, if
//! their bucket prefixes are both specified and different.
use std::{borrow::Cow, sync::Arc};
use std::sync::Arc;
use anyhow::Context;
use aws_config::{
@@ -556,20 +556,6 @@ impl RemoteStorage for S3Bucket {
.deleted_objects_total
.inc_by(chunk.len() as u64);
if let Some(errors) = resp.errors {
// Log a bounded number of the errors within the response:
// these requests can carry 1000 keys so logging each one
// would be too verbose, especially as errors may lead us
// to retry repeatedly.
const LOG_UP_TO_N_ERRORS: usize = 10;
for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
tracing::warn!(
"DeleteObjects key {} failed: {}: {}",
e.key.as_ref().map(Cow::from).unwrap_or("".into()),
e.code.as_ref().map(Cow::from).unwrap_or("".into()),
e.message.as_ref().map(Cow::from).unwrap_or("".into())
);
}
return Err(anyhow::format_err!(
"Failed to delete {} objects",
errors.len()

View File

@@ -11,7 +11,10 @@ use std::sync::{Arc, Barrier};
use bytes::{Buf, Bytes};
use pageserver::{
config::PageServerConf, repository::Key, walrecord::NeonWalRecord, walredo::PostgresRedoManager,
config::PageServerConf,
repository::Key,
walrecord::NeonWalRecord,
walredo::{PostgresRedoManager, WalRedoError},
};
use utils::{id::TenantId, lsn::Lsn};
@@ -149,7 +152,7 @@ impl Drop for JoinOnDrop {
}
}
fn execute_all<I>(input: I, manager: &PostgresRedoManager) -> anyhow::Result<()>
fn execute_all<I>(input: I, manager: &PostgresRedoManager) -> Result<(), WalRedoError>
where
I: IntoIterator<Item = Request>,
{
@@ -157,7 +160,7 @@ where
input.into_iter().try_for_each(|req| {
let page = req.execute(manager)?;
assert_eq!(page.remaining(), 8192);
anyhow::Ok(())
Ok::<_, WalRedoError>(())
})
}
@@ -470,7 +473,7 @@ struct Request {
}
impl Request {
fn execute(self, manager: &PostgresRedoManager) -> anyhow::Result<Bytes> {
fn execute(self, manager: &PostgresRedoManager) -> Result<Bytes, WalRedoError> {
use pageserver::walredo::WalRedoManager;
let Request {

View File

@@ -8,6 +8,7 @@ use anyhow::{anyhow, bail, ensure, Context, Result};
use remote_storage::{RemotePath, RemoteStorageConfig};
use serde::de::IntoDeserializer;
use std::env;
use std::ops::Deref;
use storage_broker::Uri;
use utils::crashsafe::path_with_suffix_extension;
use utils::id::ConnectionId;
@@ -153,7 +154,7 @@ pub struct PageServerConf {
// that during unit testing, because the current directory is global
// to the process but different unit tests work on different
// repositories.
pub workdir: Utf8PathBuf,
pub workdir: PageserverConfWorkdir,
pub pg_distrib_dir: Utf8PathBuf,
@@ -241,6 +242,45 @@ impl<T> BuilderValue<T> {
}
}
#[derive(Clone, PartialEq, Eq)]
pub struct PageserverConfWorkdir {
d: Utf8PathBuf,
}
impl Deref for PageServerConf {
type Target = PageserverConfWorkdir;
fn deref(&self) -> &Self::Target {
&self.workdir
}
}
impl Deref for PageserverConfWorkdir {
type Target = Utf8Path;
fn deref(&self) -> &Self::Target {
&self.d
}
}
impl AsRef<std::path::Path> for PageserverConfWorkdir {
fn as_ref(&self) -> &std::path::Path {
self.d.as_ref()
}
}
impl std::fmt::Debug for PageserverConfWorkdir {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.d.fmt(f)
}
}
impl std::fmt::Display for PageserverConfWorkdir {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.d.fmt(f)
}
}
// needed to simplify config construction
struct PageServerConfigBuilder {
listen_pg_addr: BuilderValue<String>,
@@ -257,7 +297,7 @@ struct PageServerConfigBuilder {
page_cache_size: BuilderValue<usize>,
max_file_descriptors: BuilderValue<usize>,
workdir: BuilderValue<Utf8PathBuf>,
workdir: BuilderValue<PageserverConfWorkdir>,
pg_distrib_dir: BuilderValue<Utf8PathBuf>,
@@ -310,7 +350,9 @@ impl Default for PageServerConfigBuilder {
superuser: Set(DEFAULT_SUPERUSER.to_string()),
page_cache_size: Set(DEFAULT_PAGE_CACHE_SIZE),
max_file_descriptors: Set(DEFAULT_MAX_FILE_DESCRIPTORS),
workdir: Set(Utf8PathBuf::new()),
workdir: Set(PageserverConfWorkdir {
d: Utf8PathBuf::new(),
}),
pg_distrib_dir: Set(Utf8PathBuf::from_path_buf(
env::current_dir().expect("cannot access current directory"),
)
@@ -399,7 +441,7 @@ impl PageServerConfigBuilder {
}
pub fn workdir(&mut self, workdir: Utf8PathBuf) {
self.workdir = BuilderValue::Set(workdir)
self.workdir = BuilderValue::Set(PageserverConfWorkdir { d: workdir })
}
pub fn pg_distrib_dir(&mut self, pg_distrib_dir: Utf8PathBuf) {
@@ -599,17 +641,17 @@ impl PageServerConfigBuilder {
}
}
impl PageServerConf {
impl PageserverConfWorkdir {
//
// Repository paths, relative to workdir.
//
pub fn tenants_path(&self) -> Utf8PathBuf {
self.workdir.join(TENANTS_SEGMENT_NAME)
self.d.join(TENANTS_SEGMENT_NAME)
}
pub fn deletion_prefix(&self) -> Utf8PathBuf {
self.workdir.join("deletion")
self.d.join("deletion")
}
pub fn deletion_list_path(&self, sequence: u64) -> Utf8PathBuf {
@@ -692,7 +734,7 @@ impl PageServerConf {
}
pub fn traces_path(&self) -> Utf8PathBuf {
self.workdir.join("traces")
self.d.join("traces")
}
pub fn trace_path(
@@ -716,9 +758,11 @@ impl PageServerConf {
/// Turns storage remote path of a file into its local path.
pub fn local_path(&self, remote_path: &RemotePath) -> Utf8PathBuf {
remote_path.with_base(&self.workdir)
remote_path.with_base(&self.d)
}
}
impl PageServerConf {
//
// Postgres distribution paths
//
@@ -958,6 +1002,9 @@ impl PageServerConf {
}
pub fn dummy_conf(repo_dir: Utf8PathBuf) -> Self {
let repo_dir = PageserverConfWorkdir { d: repo_dir };
let pg_distrib_dir = Utf8PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../pg_install");
PageServerConf {
@@ -1495,10 +1542,12 @@ threshold = "20m"
Ok(())
}
fn prepare_fs(tempdir: &Utf8TempDir) -> anyhow::Result<(Utf8PathBuf, Utf8PathBuf)> {
fn prepare_fs(tempdir: &Utf8TempDir) -> anyhow::Result<(PageserverConfWorkdir, Utf8PathBuf)> {
let tempdir_path = tempdir.path();
let workdir = tempdir_path.join("workdir");
let workdir = PageserverConfWorkdir {
d: tempdir_path.join("workdir"),
};
fs::create_dir_all(&workdir)?;
let pg_distrib_dir = tempdir_path.join("pg_distrib");

View File

@@ -153,7 +153,7 @@ impl FlushOp {
#[derive(Clone, Debug)]
pub struct DeletionQueueClient {
tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
tx: tokio::sync::mpsc::Sender<ListWriterQueueMessage>,
executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
@@ -416,7 +416,7 @@ pub enum DeletionQueueError {
impl DeletionQueueClient {
pub(crate) fn broken() -> Self {
// Channels whose receivers are immediately dropped.
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
let (tx, _rx) = tokio::sync::mpsc::channel(1);
let (executor_tx, _executor_rx) = tokio::sync::mpsc::channel(1);
Self {
tx,
@@ -428,12 +428,12 @@ impl DeletionQueueClient {
/// This is cancel-safe. If you drop the future before it completes, the message
/// is not pushed, although in the context of the deletion queue it doesn't matter: once
/// we decide to do a deletion the decision is always final.
fn do_push<T>(
async fn do_push<T>(
&self,
queue: &tokio::sync::mpsc::UnboundedSender<T>,
queue: &tokio::sync::mpsc::Sender<T>,
msg: T,
) -> Result<(), DeletionQueueError> {
match queue.send(msg) {
match queue.send(msg).await {
Ok(_) => Ok(()),
Err(e) => {
// This shouldn't happen, we should shut down all tenants before
@@ -445,7 +445,7 @@ impl DeletionQueueClient {
}
}
pub(crate) fn recover(
pub(crate) async fn recover(
&self,
attached_tenants: HashMap<TenantId, Generation>,
) -> Result<(), DeletionQueueError> {
@@ -453,6 +453,7 @@ impl DeletionQueueClient {
&self.tx,
ListWriterQueueMessage::Recover(RecoverOp { attached_tenants }),
)
.await
}
/// When a Timeline wishes to update the remote_consistent_lsn that it exposes to the outside
@@ -525,21 +526,6 @@ impl DeletionQueueClient {
return self.flush_immediate().await;
}
self.push_layers_sync(tenant_id, timeline_id, current_generation, layers)
}
/// When a Tenant has a generation, push_layers is always synchronous because
/// the ListValidator channel is an unbounded channel.
///
/// This can be merged into push_layers when we remove the Generation-less mode
/// support (`<https://github.com/neondatabase/neon/issues/5395>`)
pub(crate) fn push_layers_sync(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
current_generation: Generation,
layers: Vec<(LayerFileName, Generation)>,
) -> Result<(), DeletionQueueError> {
metrics::DELETION_QUEUE
.keys_submitted
.inc_by(layers.len() as u64);
@@ -553,16 +539,17 @@ impl DeletionQueueClient {
objects: Vec::new(),
}),
)
.await
}
/// This is cancel-safe. If you drop the future the flush may still happen in the background.
async fn do_flush<T>(
&self,
queue: &tokio::sync::mpsc::UnboundedSender<T>,
queue: &tokio::sync::mpsc::Sender<T>,
msg: T,
rx: tokio::sync::oneshot::Receiver<()>,
) -> Result<(), DeletionQueueError> {
self.do_push(queue, msg)?;
self.do_push(queue, msg).await?;
if rx.await.is_err() {
// This shouldn't happen if tenants are shut down before deletion queue. If we
// encounter a bug like this, then a flusher will incorrectly believe it has flushed
@@ -583,18 +570,6 @@ impl DeletionQueueClient {
.await
}
/// Issue a flush without waiting for it to complete. This is useful on advisory flushes where
/// the caller wants to avoid the risk of waiting for lots of enqueued work, such as on tenant
/// detach where flushing is nice but not necessary.
///
/// This function provides no guarantees of work being done.
pub fn flush_advisory(&self) {
let (flush_op, _) = FlushOp::new();
// Transmit the flush message, ignoring any result (such as a closed channel during shutdown).
drop(self.tx.send(ListWriterQueueMessage::FlushExecute(flush_op)));
}
// Wait until all previous deletions are executed
pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> {
debug!("flush_execute: flushing to deletion lists...");
@@ -611,7 +586,9 @@ impl DeletionQueueClient {
// Flush any immediate-mode deletions (the above backend flush will only flush
// the executor if deletions had flowed through the backend)
debug!("flush_execute: flushing execution...");
self.flush_immediate().await?;
let (flush_op, rx) = FlushOp::new();
self.do_flush(&self.executor_tx, DeleterMessage::Flush(flush_op), rx)
.await?;
debug!("flush_execute: finished flushing execution...");
Ok(())
}
@@ -666,10 +643,8 @@ impl DeletionQueue {
where
C: ControlPlaneGenerationsApi + Send + Sync,
{
// Unbounded channel: enables non-async functions to submit deletions. The actual length is
// constrained by how promptly the ListWriter wakes up and drains it, which should be frequent
// enough to avoid this taking pathologically large amount of memory.
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
// Deep channel: it consumes deletions from all timelines and we do not want to block them
let (tx, rx) = tokio::sync::mpsc::channel(16384);
// Shallow channel: it carries DeletionLists which each contain up to thousands of deletions
let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16);
@@ -982,7 +957,7 @@ mod test {
// Basic test that the deletion queue processes the deletions we pass into it
let ctx = setup("deletion_queue_smoke").expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new())?;
client.recover(HashMap::new()).await?;
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
let tenant_id = ctx.harness.tenant_id;
@@ -1050,7 +1025,7 @@ mod test {
async fn deletion_queue_validation() -> anyhow::Result<()> {
let ctx = setup("deletion_queue_validation").expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new())?;
client.recover(HashMap::new()).await?;
// Generation that the control plane thinks is current
let latest_generation = Generation::new(0xdeadbeef);
@@ -1107,7 +1082,7 @@ mod test {
// Basic test that the deletion queue processes the deletions we pass into it
let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new())?;
client.recover(HashMap::new()).await?;
let tenant_id = ctx.harness.tenant_id;
@@ -1170,7 +1145,9 @@ mod test {
drop(client);
ctx.restart().await;
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::from([(tenant_id, now_generation)]))?;
client
.recover(HashMap::from([(tenant_id, now_generation)]))
.await?;
info!("Flush-executing");
client.flush_execute().await?;
@@ -1196,7 +1173,7 @@ pub(crate) mod mock {
};
pub struct ConsumerState {
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
executor_rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
}
@@ -1273,7 +1250,7 @@ pub(crate) mod mock {
}
pub struct MockDeletionQueue {
tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
tx: tokio::sync::mpsc::Sender<ListWriterQueueMessage>,
executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
executed: Arc<AtomicUsize>,
remote_storage: Option<GenericRemoteStorage>,
@@ -1283,7 +1260,7 @@ pub(crate) mod mock {
impl MockDeletionQueue {
pub fn new(remote_storage: Option<GenericRemoteStorage>) -> Self {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let (tx, rx) = tokio::sync::mpsc::channel(16384);
let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16384);
let executed = Arc::new(AtomicUsize::new(0));

View File

@@ -13,7 +13,6 @@ use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::warn;
use utils::backoff;
use crate::metrics;
@@ -64,19 +63,7 @@ impl Deleter {
Err(anyhow::anyhow!("failpoint hit"))
});
// A backoff::retry is used here for two reasons:
// - To provide a backoff rather than busy-polling the API on errors
// - To absorb transient 429/503 conditions without hitting our error
// logging path for issues deleting objects.
backoff::retry(
|| async { self.remote_storage.delete_objects(&self.accumulator).await },
|_| false,
3,
10,
"executing deletion batch",
backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Shutting down")),
)
.await
self.remote_storage.delete_objects(&self.accumulator).await
}
/// Block until everything in accumulator has been executed
@@ -101,10 +88,7 @@ impl Deleter {
self.accumulator.clear();
}
Err(e) => {
if self.cancel.is_cancelled() {
return Err(DeletionQueueError::ShuttingDown);
}
warn!("DeleteObjects request failed: {e:#}, will continue trying");
warn!("DeleteObjects request failed: {e:#}, will retry");
metrics::DELETION_QUEUE
.remote_errors
.with_label_values(&["execute"])

View File

@@ -85,7 +85,7 @@ pub(super) struct ListWriter {
conf: &'static PageServerConf,
// Incoming frontend requests to delete some keys
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
// Outbound requests to the backend to execute deletion lists we have composed.
tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
@@ -111,7 +111,7 @@ impl ListWriter {
pub(super) fn new(
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
cancel: CancellationToken,
) -> Self {

View File

@@ -77,7 +77,7 @@ impl State {
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
) -> anyhow::Result<Self> {
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"]
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"]
.iter()
.map(|v| v.parse().unwrap())
.collect::<Vec<_>>();
@@ -136,7 +136,9 @@ impl From<PageReconstructError> for ApiError {
PageReconstructError::AncestorStopping(_) => {
ApiError::ResourceUnavailable(format!("{pre}").into())
}
PageReconstructError::WalRedo(pre) => ApiError::InternalServerError(pre),
PageReconstructError::WalRedo(pre) => {
ApiError::InternalServerError(anyhow::Error::new(pre))
}
}
}
}
@@ -162,6 +164,9 @@ impl From<TenantStateError> for ApiError {
fn from(tse: TenantStateError) -> ApiError {
match tse {
TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
TenantStateError::NotActive(_) => {
ApiError::ResourceUnavailable("Tenant not yet active".into())
}
TenantStateError::IsStopping(_) => {
ApiError::ResourceUnavailable("Tenant is stopping".into())
}
@@ -570,14 +575,9 @@ async fn tenant_detach_handler(
let state = get_state(&request);
let conf = state.conf;
mgr::detach_tenant(
conf,
tenant_id,
detach_ignored.unwrap_or(false),
&state.deletion_queue_client,
)
.instrument(info_span!("tenant_detach", %tenant_id))
.await?;
mgr::detach_tenant(conf, tenant_id, detach_ignored.unwrap_or(false))
.instrument(info_span!("tenant_detach", %tenant_id))
.await?;
json_response(StatusCode::OK, ())
}
@@ -1034,7 +1034,7 @@ async fn put_tenant_location_config_handler(
// The `Detached` state is special, it doesn't upsert a tenant, it removes
// its local disk content and drops it from memory.
if let LocationConfigMode::Detached = request_data.config.mode {
mgr::detach_tenant(conf, tenant_id, true, &state.deletion_queue_client)
mgr::detach_tenant(conf, tenant_id, true)
.instrument(info_span!("tenant_detach", %tenant_id))
.await?;
return json_response(StatusCode::OK, ());

View File

@@ -45,7 +45,6 @@ use std::sync::{Mutex, RwLock};
use std::time::{Duration, Instant};
use self::config::AttachedLocationConfig;
use self::config::AttachmentMode;
use self::config::LocationConf;
use self::config::TenantConf;
use self::delete::DeleteTenantFlow;
@@ -209,7 +208,7 @@ pub struct Tenant {
/// The remote storage generation, used to protect S3 objects from split-brain.
/// Does not change over the lifetime of the [`Tenant`] object.
///
///
/// This duplicates the generation stored in LocationConf, but that structure is mutable:
/// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
generation: Generation,
@@ -2077,15 +2076,6 @@ impl Tenant {
}
}
}
pub(crate) fn get_attach_mode(&self) -> AttachmentMode {
self.tenant_conf
.read()
.unwrap()
.location
.attach_mode
.clone()
}
}
/// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
@@ -2755,11 +2745,6 @@ impl Tenant {
) -> Result<Arc<Timeline>, CreateTimelineError> {
let src_id = src_timeline.timeline_id;
// First acquire the GC lock so that another task cannot advance the GC
// cutoff in 'gc_info', and make 'start_lsn' invalid, while we are
// creating the branch.
let _gc_cs = self.gc_cs.lock().await;
// If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN
let start_lsn = start_lsn.unwrap_or_else(|| {
let lsn = src_timeline.get_last_record_lsn();
@@ -2767,6 +2752,11 @@ impl Tenant {
lsn
});
// First acquire the GC lock so that another task cannot advance the GC
// cutoff in 'gc_info', and make 'start_lsn' invalid, while we are
// creating the branch.
let _gc_cs = self.gc_cs.lock().await;
// Create a placeholder for the new branch. This will error
// out if the new timeline ID is already in use.
let timeline_uninit_mark = {
@@ -3448,8 +3438,11 @@ pub mod harness {
use crate::deletion_queue::mock::MockDeletionQueue;
use crate::{
config::PageServerConf, repository::Key, tenant::Tenant, walrecord::NeonWalRecord,
walredo::WalRedoManager,
config::PageServerConf,
repository::Key,
tenant::Tenant,
walrecord::NeonWalRecord,
walredo::{WalRedoError, WalRedoManager},
};
use super::*;
@@ -3622,7 +3615,7 @@ pub mod harness {
base_img: Option<(Lsn, Bytes)>,
records: Vec<(Lsn, NeonWalRecord)>,
_pg_version: u32,
) -> anyhow::Result<Bytes> {
) -> Result<Bytes, WalRedoError> {
let s = format!(
"redo for {} to get to {}, with {} and {} records",
key,

View File

@@ -31,7 +31,7 @@ use super::{
const SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS: u32 = 3;
#[derive(Debug, thiserror::Error)]
pub(crate) enum DeleteTenantError {
pub enum DeleteTenantError {
#[error("GetTenant {0}")]
Get(#[from] GetTenantError),
@@ -376,7 +376,7 @@ impl DeleteTenantFlow {
Ok(())
}
pub(crate) async fn should_resume_deletion(
pub async fn should_resume_deletion(
conf: &'static PageServerConf,
remote_storage: Option<&GenericRemoteStorage>,
tenant: &Tenant,

View File

@@ -24,7 +24,7 @@ use crate::control_plane_client::{
};
use crate::deletion_queue::DeletionQueueClient;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt};
use crate::tenant::config::{LocationConf, LocationMode, TenantConfOpt};
use crate::tenant::delete::DeleteTenantFlow;
use crate::tenant::{
create_tenant_files, AttachedTenantConf, CreateTenantFilesMode, Tenant, TenantState,
@@ -50,7 +50,7 @@ use super::TenantSharedResources;
/// its lifetime, and we can preserve some important safety invariants like `Tenant` always
/// having a properly acquired generation (Secondary doesn't need a generation)
#[derive(Clone)]
pub(crate) enum TenantSlot {
pub enum TenantSlot {
Attached(Arc<Tenant>),
Secondary,
}
@@ -206,7 +206,8 @@ async fn init_load_generations(
if resources.remote_storage.is_some() {
resources
.deletion_queue_client
.recover(generations.clone())?;
.recover(generations.clone())
.await?;
}
Ok(Some(generations))
@@ -481,7 +482,7 @@ pub(crate) fn schedule_local_tenant_processing(
/// management API. For example, it could attach the tenant on a different pageserver.
/// We would then be in split-brain once this pageserver restarts.
#[instrument(skip_all)]
pub(crate) async fn shutdown_all_tenants() {
pub async fn shutdown_all_tenants() {
shutdown_all_tenants0(&TENANTS).await
}
@@ -593,7 +594,7 @@ async fn shutdown_all_tenants0(tenants: &tokio::sync::RwLock<TenantsMap>) {
// caller will log how long we took
}
pub(crate) async fn create_tenant(
pub async fn create_tenant(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_id: TenantId,
@@ -628,14 +629,14 @@ pub(crate) async fn create_tenant(
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum SetNewTenantConfigError {
pub enum SetNewTenantConfigError {
#[error(transparent)]
GetTenant(#[from] GetTenantError),
#[error(transparent)]
Persist(anyhow::Error),
}
pub(crate) async fn set_new_tenant_config(
pub async fn set_new_tenant_config(
conf: &'static PageServerConf,
new_tenant_conf: TenantConfOpt,
tenant_id: TenantId,
@@ -694,18 +695,6 @@ pub(crate) async fn upsert_location(
if let Some(tenant) = shutdown_tenant {
let (_guard, progress) = utils::completion::channel();
match tenant.get_attach_mode() {
AttachmentMode::Single | AttachmentMode::Multi => {
// Before we leave our state as the presumed holder of the latest generation,
// flush any outstanding deletions to reduce the risk of leaking objects.
deletion_queue_client.flush_advisory()
}
AttachmentMode::Stale => {
// If we're stale there's not point trying to flush deletions
}
};
info!("Shutting down attached tenant");
match tenant.shutdown(progress, false).await {
Ok(()) => {}
@@ -776,7 +765,7 @@ pub(crate) async fn upsert_location(
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum GetTenantError {
pub enum GetTenantError {
#[error("Tenant {0} not found")]
NotFound(TenantId),
#[error("Tenant {0} is not active")]
@@ -792,7 +781,7 @@ pub(crate) enum GetTenantError {
/// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
///
/// This method is cancel-safe.
pub(crate) async fn get_tenant(
pub async fn get_tenant(
tenant_id: TenantId,
active_only: bool,
) -> Result<Arc<Tenant>, GetTenantError> {
@@ -817,7 +806,7 @@ pub(crate) async fn get_tenant(
}
}
pub(crate) async fn delete_tenant(
pub async fn delete_tenant(
conf: &'static PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
tenant_id: TenantId,
@@ -826,7 +815,7 @@ pub(crate) async fn delete_tenant(
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum DeleteTimelineError {
pub enum DeleteTimelineError {
#[error("Tenant {0}")]
Tenant(#[from] GetTenantError),
@@ -834,7 +823,7 @@ pub(crate) enum DeleteTimelineError {
Timeline(#[from] crate::tenant::DeleteTimelineError),
}
pub(crate) async fn delete_timeline(
pub async fn delete_timeline(
tenant_id: TenantId,
timeline_id: TimelineId,
_ctx: &RequestContext,
@@ -845,29 +834,23 @@ pub(crate) async fn delete_timeline(
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum TenantStateError {
pub enum TenantStateError {
#[error("Tenant {0} not found")]
NotFound(TenantId),
#[error("Tenant {0} is stopping")]
IsStopping(TenantId),
#[error("Tenant {0} is not active")]
NotActive(TenantId),
#[error(transparent)]
Other(#[from] anyhow::Error),
}
pub(crate) async fn detach_tenant(
pub async fn detach_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
detach_ignored: bool,
deletion_queue_client: &DeletionQueueClient,
) -> Result<(), TenantStateError> {
let tmp_path = detach_tenant0(
conf,
&TENANTS,
tenant_id,
detach_ignored,
deletion_queue_client,
)
.await?;
let tmp_path = detach_tenant0(conf, &TENANTS, tenant_id, detach_ignored).await?;
// Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
// After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
let task_tenant_id = None;
@@ -892,7 +875,6 @@ async fn detach_tenant0(
tenants: &tokio::sync::RwLock<TenantsMap>,
tenant_id: TenantId,
detach_ignored: bool,
deletion_queue_client: &DeletionQueueClient,
) -> Result<Utf8PathBuf, TenantStateError> {
let tenant_dir_rename_operation = |tenant_id_to_clean| async move {
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
@@ -904,10 +886,6 @@ async fn detach_tenant0(
let removal_result =
remove_tenant_from_memory(tenants, tenant_id, tenant_dir_rename_operation(tenant_id)).await;
// Flush pending deletions, so that they have a good chance of passing validation
// before this tenant is potentially re-attached elsewhere.
deletion_queue_client.flush_advisory();
// Ignored tenants are not present in memory and will bail the removal from memory operation.
// Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
if detach_ignored && matches!(removal_result, Err(TenantStateError::NotFound(_))) {
@@ -924,7 +902,7 @@ async fn detach_tenant0(
removal_result
}
pub(crate) async fn load_tenant(
pub async fn load_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
generation: Generation,
@@ -961,7 +939,7 @@ pub(crate) async fn load_tenant(
Ok(())
}
pub(crate) async fn ignore_tenant(
pub async fn ignore_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
) -> Result<(), TenantStateError> {
@@ -989,7 +967,7 @@ async fn ignore_tenant0(
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum TenantMapListError {
pub enum TenantMapListError {
#[error("tenant map is still initiailizing")]
Initializing,
}
@@ -997,7 +975,7 @@ pub(crate) enum TenantMapListError {
///
/// Get list of tenants, for the mgmt API
///
pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
let tenants = TENANTS.read().await;
let m = match &*tenants {
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
@@ -1015,7 +993,7 @@ pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, Tenan
///
/// Downloading all the tenant data is performed in the background, this merely
/// spawns the background task and returns quickly.
pub(crate) async fn attach_tenant(
pub async fn attach_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
generation: Generation,
@@ -1052,7 +1030,7 @@ pub(crate) async fn attach_tenant(
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum TenantMapInsertError {
pub enum TenantMapInsertError {
#[error("tenant map is still initializing")]
StillInitializing,
#[error("tenant map is shutting down")]
@@ -1215,7 +1193,7 @@ use {
utils::http::error::ApiError,
};
pub(crate) async fn immediate_gc(
pub async fn immediate_gc(
tenant_id: TenantId,
timeline_id: TimelineId,
gc_req: TimelineGcRequest,

View File

@@ -370,7 +370,7 @@ pub enum PageReconstructError {
/// An error happened replaying WAL records
#[error(transparent)]
WalRedo(anyhow::Error),
WalRedo(#[from] crate::walredo::WalRedoError),
}
impl std::fmt::Debug for PageReconstructError {
@@ -4327,7 +4327,7 @@ impl Timeline {
let img = match self
.walredo_mgr
.request_redo(key, request_lsn, data.img, data.records, self.pg_version)
.context("Failed to reconstruct a page image")
.context("Failed to reconstruct a page image:")
{
Ok(img) => img,
Err(e) => return Err(PageReconstructError::from(e)),

View File

@@ -18,7 +18,6 @@
//! any WAL records, so that even if an attacker hijacks the Postgres
//! process, he cannot escape out of it.
//!
use anyhow::Context;
use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, Bytes, BytesMut};
use nix::poll::*;
@@ -90,7 +89,7 @@ pub trait WalRedoManager: Send + Sync {
base_img: Option<(Lsn, Bytes)>,
records: Vec<(Lsn, NeonWalRecord)>,
pg_version: u32,
) -> anyhow::Result<Bytes>;
) -> Result<Bytes, WalRedoError>;
}
struct ProcessInput {
@@ -101,14 +100,6 @@ struct ProcessInput {
n_requests: usize,
}
enum ApplyWalRecordsError {
WithRequestNo {
error: anyhow::Error,
request_no: usize,
},
NoRequestNo(anyhow::Error),
}
struct ProcessOutput {
stdout: ChildStdout,
pending_responses: VecDeque<Option<Bytes>>,
@@ -149,6 +140,20 @@ fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
}
}
/// An error happened in WAL redo
#[derive(Debug, thiserror::Error)]
pub enum WalRedoError {
#[error(transparent)]
IoError(#[from] std::io::Error),
#[error("cannot perform WAL redo now")]
InvalidState,
#[error("cannot perform WAL redo for this request")]
InvalidRequest,
#[error("cannot perform WAL redo for this record")]
InvalidRecord,
}
///
/// Public interface of WAL redo manager
///
@@ -166,9 +171,10 @@ impl WalRedoManager for PostgresRedoManager {
base_img: Option<(Lsn, Bytes)>,
records: Vec<(Lsn, NeonWalRecord)>,
pg_version: u32,
) -> anyhow::Result<Bytes> {
) -> Result<Bytes, WalRedoError> {
if records.is_empty() {
anyhow::bail!("invalid WAL redo request with no records");
error!("invalid WAL redo request with no records");
return Err(WalRedoError::InvalidRequest);
}
let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
@@ -180,13 +186,7 @@ impl WalRedoManager for PostgresRedoManager {
if rec_neon != batch_neon {
let result = if batch_neon {
self.apply_batch_neon(
key,
lsn,
img,
&records[batch_start..i],
(batch_start, records.len()),
)
self.apply_batch_neon(key, lsn, img, &records[batch_start..i])
} else {
self.apply_batch_postgres(
key,
@@ -196,7 +196,6 @@ impl WalRedoManager for PostgresRedoManager {
&records[batch_start..i],
self.conf.wal_redo_timeout,
pg_version,
(batch_start, records.len()),
)
};
img = Some(result?);
@@ -207,13 +206,7 @@ impl WalRedoManager for PostgresRedoManager {
}
// last batch
if batch_neon {
self.apply_batch_neon(
key,
lsn,
img,
&records[batch_start..],
(batch_start, records.len()),
)
self.apply_batch_neon(key, lsn, img, &records[batch_start..])
} else {
self.apply_batch_postgres(
key,
@@ -223,7 +216,6 @@ impl WalRedoManager for PostgresRedoManager {
&records[batch_start..],
self.conf.wal_redo_timeout,
pg_version,
(batch_start, records.len()),
)
}
}
@@ -268,9 +260,8 @@ impl PostgresRedoManager {
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
pg_version: u32,
diag: impl std::fmt::Debug,
) -> anyhow::Result<Bytes> {
let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
) -> Result<Bytes, WalRedoError> {
let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?;
const MAX_RETRY_ATTEMPTS: u32 = 1;
let start_time = Instant::now();
let mut n_attempts = 0u32;
@@ -280,15 +271,15 @@ impl PostgresRedoManager {
// launch the WAL redo process on first use
if proc.is_none() {
self.launch(&mut proc, pg_version)
.context("launch process")?;
self.launch(&mut proc, pg_version)?;
}
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
// Relational WAL records are applied using wal-redo-postgres
let buf_tag = BufferTag { rel, blknum };
let result =
self.apply_wal_records(proc, buf_tag, &base_img, records, wal_redo_timeout);
let result = self
.apply_wal_records(proc, buf_tag, &base_img, records, wal_redo_timeout)
.map_err(WalRedoError::IoError);
let end_time = Instant::now();
let duration = end_time.duration_since(lock_time);
@@ -307,35 +298,26 @@ impl PostgresRedoManager {
WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
debug!(
"postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {} diag={:?}",
"postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
len,
nbytes,
duration.as_micros(),
lsn,
diag,
lsn
);
// If something went wrong, don't try to reuse the process. Kill it, and
// next request will launch a new one.
if let Err(e) = result.as_ref() {
let (e, request_no) = match e {
ApplyWalRecordsError::WithRequestNo { error, request_no } => {
(error, Some(*request_no))
}
ApplyWalRecordsError::NoRequestNo(e) => (e, None),
};
error!(
"error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {} n_attempts={} request_no={:?} {:?}: {:?}",
n_attempts,
"error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {}: {}",
records.len(),
records.first().map(|p| p.0).unwrap_or(Lsn(0)),
records.last().map(|p| p.0).unwrap_or(Lsn(0)),
nbytes,
base_img_lsn,
lsn,
n_attempts,
request_no,
diag,
e,
utils::error::report_compact_sources(e),
);
// self.stdin only holds stdin & stderr as_raw_fd().
// Dropping it as part of take() doesn't close them.
@@ -354,14 +336,11 @@ impl PostgresRedoManager {
proc.child.kill_and_wait();
}
} else if n_attempts != 0 {
info!(n_attempts, ?diag, "retried walredo succeeded");
info!(n_attempts, "retried walredo succeeded");
}
n_attempts += 1;
if n_attempts > MAX_RETRY_ATTEMPTS || result.is_ok() {
return result.map_err(|e| match e {
ApplyWalRecordsError::WithRequestNo { error, .. } => error,
ApplyWalRecordsError::NoRequestNo(e) => e,
});
return result;
}
}
}
@@ -375,8 +354,7 @@ impl PostgresRedoManager {
lsn: Lsn,
base_img: Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
diag: impl std::fmt::Debug,
) -> anyhow::Result<Bytes> {
) -> Result<Bytes, WalRedoError> {
let start_time = Instant::now();
let mut page = BytesMut::new();
@@ -385,7 +363,8 @@ impl PostgresRedoManager {
page.extend_from_slice(&fpi[..]);
} else {
// All the current WAL record types that we can handle require a base image.
anyhow::bail!("invalid neon WAL redo request with no base image");
error!("invalid neon WAL redo request with no base image");
return Err(WalRedoError::InvalidRequest);
}
// Apply all the WAL records in the batch
@@ -398,11 +377,10 @@ impl PostgresRedoManager {
WAL_REDO_TIME.observe(duration.as_secs_f64());
debug!(
"neon applied {} WAL records in {} ms to reconstruct page image at LSN {} diag={:?}",
"neon applied {} WAL records in {} ms to reconstruct page image at LSN {}",
records.len(),
duration.as_micros(),
lsn,
diag,
lsn
);
Ok(page.freeze())
@@ -414,13 +392,14 @@ impl PostgresRedoManager {
page: &mut BytesMut,
_record_lsn: Lsn,
record: &NeonWalRecord,
) -> anyhow::Result<()> {
) -> Result<(), WalRedoError> {
match record {
NeonWalRecord::Postgres {
will_init: _,
rec: _,
} => {
anyhow::bail!("tried to pass postgres wal record to neon WAL redo");
error!("tried to pass postgres wal record to neon WAL redo");
return Err(WalRedoError::InvalidRequest);
}
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno,
@@ -428,7 +407,7 @@ impl PostgresRedoManager {
flags,
} => {
// sanity check that this is modifying the correct relation
let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?;
assert!(
rel.forknum == VISIBILITYMAP_FORKNUM,
"ClearVisibilityMapFlags record on unexpected rel {}",
@@ -466,7 +445,7 @@ impl PostgresRedoManager {
// same effects as the corresponding Postgres WAL redo function.
NeonWalRecord::ClogSetCommitted { xids, timestamp } => {
let (slru_kind, segno, blknum) =
key_to_slru_block(key).context("invalid record")?;
key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
assert_eq!(
slru_kind,
SlruKind::Clog,
@@ -516,7 +495,7 @@ impl PostgresRedoManager {
}
NeonWalRecord::ClogSetAborted { xids } => {
let (slru_kind, segno, blknum) =
key_to_slru_block(key).context("invalid record")?;
key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
assert_eq!(
slru_kind,
SlruKind::Clog,
@@ -547,7 +526,7 @@ impl PostgresRedoManager {
}
NeonWalRecord::MultixactOffsetCreate { mid, moff } => {
let (slru_kind, segno, blknum) =
key_to_slru_block(key).context("invalid record")?;
key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
assert_eq!(
slru_kind,
SlruKind::MultiXactOffsets,
@@ -580,7 +559,7 @@ impl PostgresRedoManager {
}
NeonWalRecord::MultixactMembersCreate { moff, members } => {
let (slru_kind, segno, blknum) =
key_to_slru_block(key).context("invalid record")?;
key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
assert_eq!(
slru_kind,
SlruKind::MultiXactMembers,
@@ -780,7 +759,7 @@ impl PostgresRedoManager {
base_img: &Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
) -> Result<Bytes, ApplyWalRecordsError> {
) -> Result<Bytes, std::io::Error> {
// Serialize all the messages to send the WAL redo process first.
//
// This could be problematic if there are millions of records to replay,
@@ -803,9 +782,10 @@ impl PostgresRedoManager {
{
build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
} else {
return Err(ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(
"tried to pass neon wal record to postgres WAL redo"
)));
return Err(Error::new(
ErrorKind::Other,
"tried to pass neon wal record to postgres WAL redo",
));
}
}
build_get_page_msg(tag, &mut writebuf);
@@ -827,7 +807,7 @@ impl PostgresRedoManager {
writebuf: &[u8],
mut input: MutexGuard<Option<ProcessInput>>,
wal_redo_timeout: Duration,
) -> Result<Bytes, ApplyWalRecordsError> {
) -> Result<Bytes, std::io::Error> {
let proc = input.as_mut().unwrap();
let mut nwrite = 0usize;
let stdout_fd = proc.stdout_fd;
@@ -848,13 +828,10 @@ impl PostgresRedoManager {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
}
.map_err(|e| ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(e)))?;
}?;
if n == 0 {
return Err(ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(
"WAL redo timed out"
)));
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
}
// If we have some messages in stderr, forward them to the log.
@@ -863,9 +840,7 @@ impl PostgresRedoManager {
let mut errbuf: [u8; 16384] = [0; 16384];
let mut stderr_guard = self.stderr.lock().unwrap();
let stderr = stderr_guard.as_mut().unwrap();
let len = stderr
.read(&mut errbuf)
.map_err(|e| ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(e)))?;
let len = stderr.read(&mut errbuf)?;
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
@@ -880,23 +855,22 @@ impl PostgresRedoManager {
continue;
}
} else if err_revents.contains(PollFlags::POLLHUP) {
return Err(ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(
"WAL redo process closed its stderr unexpectedly"
)));
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stderr unexpectedly",
));
}
// If 'stdin' is writeable, do write.
let in_revents = pollfds[0].revents().unwrap();
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
nwrite += proc
.stdin
.write(&writebuf[nwrite..])
.map_err(|e| ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(e)))?;
nwrite += proc.stdin.write(&writebuf[nwrite..])?;
} else if in_revents.contains(PollFlags::POLLHUP) {
// We still have more data to write, but the process closed the pipe.
return Err(ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(
"WAL redo process closed its stdin unexpectedly"
)));
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdin unexpectedly",
));
}
}
let request_no = proc.n_requests;
@@ -927,10 +901,10 @@ impl PostgresRedoManager {
//
// Cross-read this with the comment in apply_batch_postgres if result.is_err().
// That's where we kill the child process.
return Err(ApplyWalRecordsError::WithRequestNo {
request_no,
error: anyhow::anyhow!("WAL redo process closed its stdout unexpectedly"),
});
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdout unexpectedly",
));
}
let n_processed_responses = output.n_processed_responses;
while n_processed_responses + output.pending_responses.len() <= request_no {
@@ -946,14 +920,10 @@ impl PostgresRedoManager {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
}
.map_err(|e| ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(e)))?;
}?;
if n == 0 {
return Err(ApplyWalRecordsError::WithRequestNo {
request_no,
error: anyhow::anyhow!("WAL redo timed out"),
});
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
}
// If we have some messages in stderr, forward them to the log.
@@ -962,12 +932,7 @@ impl PostgresRedoManager {
let mut errbuf: [u8; 16384] = [0; 16384];
let mut stderr_guard = self.stderr.lock().unwrap();
let stderr = stderr_guard.as_mut().unwrap();
let len = stderr.read(&mut errbuf).map_err(|e| {
ApplyWalRecordsError::WithRequestNo {
request_no,
error: anyhow::anyhow!(e),
}
})?;
let len = stderr.read(&mut errbuf)?;
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
@@ -982,26 +947,21 @@ impl PostgresRedoManager {
continue;
}
} else if err_revents.contains(PollFlags::POLLHUP) {
return Err(ApplyWalRecordsError::WithRequestNo {
request_no,
error: anyhow::anyhow!("WAL redo process closed its stderr unexpectedly"),
});
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stderr unexpectedly",
));
}
// If we have some data in stdout, read it to the result buffer.
let out_revents = pollfds[2].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
nresult += output.stdout.read(&mut resultbuf[nresult..]).map_err(|e| {
ApplyWalRecordsError::WithRequestNo {
request_no,
error: anyhow::anyhow!(e),
}
})?;
nresult += output.stdout.read(&mut resultbuf[nresult..])?;
} else if out_revents.contains(PollFlags::POLLHUP) {
return Err(ApplyWalRecordsError::WithRequestNo {
request_no,
error: anyhow::anyhow!("WAL redo process closed its stdout unexpectedly"),
});
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdout unexpectedly",
));
}
}
output

View File

@@ -1,6 +1,5 @@
use futures::future::Either;
use proxy::auth;
use proxy::config::HttpConfig;
use proxy::console;
use proxy::http;
use proxy::metrics;
@@ -80,9 +79,6 @@ struct ProxyCliArgs {
/// Allow self-signed certificates for compute nodes (for testing)
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
allow_self_signed_compute: bool,
/// timeout for http connections
#[clap(long, default_value = "15s", value_parser = humantime::parse_duration)]
sql_over_http_timeout: tokio::time::Duration,
}
#[tokio::main]
@@ -224,15 +220,12 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
auth::BackendType::Link(Cow::Owned(url))
}
};
let http_config = HttpConfig {
sql_over_http_timeout: args.sql_over_http_timeout,
};
let config = Box::leak(Box::new(ProxyConfig {
tls_config,
auth_backend,
metric_collection,
allow_self_signed_compute: args.allow_self_signed_compute,
http_config,
}));
Ok(config)

View File

@@ -13,7 +13,6 @@ pub struct ProxyConfig {
pub auth_backend: auth::BackendType<'static, ()>,
pub metric_collection: Option<MetricCollectionConfig>,
pub allow_self_signed_compute: bool,
pub http_config: HttpConfig,
}
#[derive(Debug)]
@@ -27,10 +26,6 @@ pub struct TlsConfig {
pub common_names: Option<HashSet<String>>,
}
pub struct HttpConfig {
pub sql_over_http_timeout: tokio::time::Duration,
}
impl TlsConfig {
pub fn to_server_config(&self) -> Arc<rustls::ServerConfig> {
self.config.clone()

View File

@@ -20,7 +20,6 @@ use tokio_postgres::AsyncMessage;
use crate::{
auth, console,
metrics::{Ids, MetricCounter, USAGE_METRICS},
proxy::{NUM_DB_CONNECTIONS_CLOSED_COUNTER, NUM_DB_CONNECTIONS_OPENED_COUNTER},
};
use crate::{compute, config};
@@ -419,42 +418,36 @@ async fn connect_to_compute_once(
};
tokio::spawn(
async move {
NUM_DB_CONNECTIONS_OPENED_COUNTER.with_label_values(&["http"]).inc();
scopeguard::defer! {
NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc();
poll_fn(move |cx| {
if matches!(rx.has_changed(), Ok(true)) {
session = *rx.borrow_and_update();
info!(%session, "changed session");
}
poll_fn(move |cx| {
if matches!(rx.has_changed(), Ok(true)) {
session = *rx.borrow_and_update();
info!(%session, "changed session");
}
loop {
let message = ready!(connection.poll_message(cx));
loop {
let message = ready!(connection.poll_message(cx));
match message {
Some(Ok(AsyncMessage::Notice(notice))) => {
info!(%session, "notice: {}", notice);
}
Some(Ok(AsyncMessage::Notification(notif))) => {
warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received");
}
Some(Ok(_)) => {
warn!(%session, "unknown message");
}
Some(Err(e)) => {
error!(%session, "connection error: {}", e);
return Poll::Ready(())
}
None => {
info!("connection closed");
return Poll::Ready(())
}
match message {
Some(Ok(AsyncMessage::Notice(notice))) => {
info!(%session, "notice: {}", notice);
}
Some(Ok(AsyncMessage::Notification(notif))) => {
warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received");
}
Some(Ok(_)) => {
warn!(%session, "unknown message");
}
Some(Err(e)) => {
error!(%session, "connection error: {}", e);
return Poll::Ready(())
}
None => {
info!("connection closed");
return Poll::Ready(())
}
}
}).await
}
}
})
.instrument(span)
);

View File

@@ -24,9 +24,6 @@ use url::Url;
use utils::http::error::ApiError;
use utils::http::json::json_response;
use crate::config::HttpConfig;
use crate::proxy::{NUM_CONNECTIONS_ACCEPTED_COUNTER, NUM_CONNECTIONS_CLOSED_COUNTER};
use super::conn_pool::ConnInfo;
use super::conn_pool::GlobalConnPool;
@@ -102,9 +99,9 @@ fn json_array_to_pg_array(value: &Value) -> Result<Option<String>, serde_json::E
// convert to text with escaping
Value::Bool(_) => serde_json::to_string(value).map(Some),
Value::Number(_) => serde_json::to_string(value).map(Some),
Value::Object(_) => serde_json::to_string(value).map(Some),
// here string needs to be escaped, as it is part of the array
Value::Object(_) => json_array_to_pg_array(&Value::String(serde_json::to_string(value)?)),
Value::String(_) => serde_json::to_string(value).map(Some),
// recurse into array
@@ -191,46 +188,28 @@ pub async fn handle(
sni_hostname: Option<String>,
conn_pool: Arc<GlobalConnPool>,
session_id: uuid::Uuid,
config: &'static HttpConfig,
) -> Result<Response<Body>, ApiError> {
let result = tokio::time::timeout(
config.sql_over_http_timeout,
handle_inner(request, sni_hostname, conn_pool, session_id),
)
.await;
let result = handle_inner(request, sni_hostname, conn_pool, session_id).await;
let mut response = match result {
Ok(r) => match r {
Ok(r) => r,
Err(e) => {
let message = format!("{:?}", e);
let code = e.downcast_ref::<tokio_postgres::Error>().and_then(|e| {
e.code()
.map(|s| serde_json::to_value(s.code()).unwrap_or_default())
});
let code = match code {
Some(c) => c,
Ok(r) => r,
Err(e) => {
let message = format!("{:?}", e);
let code = match e.downcast_ref::<tokio_postgres::Error>() {
Some(e) => match e.code() {
Some(e) => serde_json::to_value(e.code()).unwrap(),
None => Value::Null,
};
error!(
?code,
"sql-over-http per-client task finished with an error: {e:#}"
);
// TODO: this shouldn't always be bad request.
json_response(
StatusCode::BAD_REQUEST,
json!({ "message": message, "code": code }),
)?
}
},
Err(_) => {
let message = format!(
"HTTP-Connection timed out, execution time exeeded {} seconds",
config.sql_over_http_timeout.as_secs()
},
None => Value::Null,
};
error!(
?code,
"sql-over-http per-client task finished with an error: {e:#}"
);
error!(message);
// TODO: this shouldn't always be bad request.
json_response(
StatusCode::GATEWAY_TIMEOUT,
json!({ "message": message, "code": StatusCode::GATEWAY_TIMEOUT.as_u16() }),
StatusCode::BAD_REQUEST,
json!({ "message": message, "code": code }),
)?
}
};
@@ -248,13 +227,6 @@ async fn handle_inner(
conn_pool: Arc<GlobalConnPool>,
session_id: uuid::Uuid,
) -> anyhow::Result<Response<Body>> {
NUM_CONNECTIONS_ACCEPTED_COUNTER
.with_label_values(&["http"])
.inc();
scopeguard::defer! {
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc();
}
//
// Determine the destination and connection params
//
@@ -613,7 +585,7 @@ fn _pg_array_parse(
}
}
}
'}' if !quote => {
'}' => {
level -= 1;
if level == 0 {
push_checked(&mut entry, &mut entries, elem_type)?;
@@ -697,14 +669,6 @@ mod tests {
"{{true,false},{NULL,42},{\"foo\",\"bar\\\"-\\\\\"}}".to_owned()
)]
);
// array of objects
let json = r#"[{"foo": 1},{"bar": 2}]"#;
let json: Value = serde_json::from_str(json).unwrap();
let pg_params = json_to_pg_text(vec![json]).unwrap();
assert_eq!(
pg_params,
vec![Some(r#"{"{\"foo\":1}","{\"bar\":2}"}"#.to_owned())]
);
}
#[test]
@@ -832,23 +796,4 @@ mod tests {
json!([[[1, 2, 3], [4, 5, 6]]])
);
}
#[test]
fn test_pg_array_parse_json() {
fn pt(pg_arr: &str) -> Value {
pg_array_parse(pg_arr, &Type::JSONB).unwrap()
}
assert_eq!(pt(r#"{"{}"}"#), json!([{}]));
assert_eq!(
pt(r#"{"{\"foo\": 1, \"bar\": 2}"}"#),
json!([{"foo": 1, "bar": 2}])
);
assert_eq!(
pt(r#"{"{\"foo\": 1}", "{\"bar\": 2}"}"#),
json!([{"foo": 1}, {"bar": 2}])
);
assert_eq!(
pt(r#"{{"{\"foo\": 1}", "{\"bar\": 2}"}}"#),
json!([[{"foo": 1}, {"bar": 2}]])
);
}
}

View File

@@ -3,10 +3,7 @@ use crate::{
config::ProxyConfig,
error::io_error,
protocol2::{ProxyProtocolAccept, WithClientIp},
proxy::{
handle_client, ClientMode, NUM_CLIENT_CONNECTION_CLOSED_COUNTER,
NUM_CLIENT_CONNECTION_OPENED_COUNTER,
},
proxy::{handle_client, ClientMode},
};
use bytes::{Buf, Bytes};
use futures::{Sink, Stream, StreamExt};
@@ -205,14 +202,7 @@ async fn ws_handler(
// TODO: that deserves a refactor as now this function also handles http json client besides websockets.
// Right now I don't want to blow up sql-over-http patch with file renames and do that as a follow up instead.
} else if request.uri().path() == "/sql" && request.method() == Method::POST {
sql_over_http::handle(
request,
sni_hostname,
conn_pool,
session_id,
&config.http_config,
)
.await
sql_over_http::handle(request, sni_hostname, conn_pool, session_id).await
} else if request.uri().path() == "/sql" && request.method() == Method::OPTIONS {
Response::builder()
.header("Allow", "OPTIONS, POST")
@@ -285,25 +275,23 @@ pub async fn task_main(
let conn_pool = conn_pool.clone();
async move {
Ok::<_, Infallible>(MetricService::new(hyper::service::service_fn(
move |req: Request<Body>| {
let sni_name = sni_name.clone();
let conn_pool = conn_pool.clone();
Ok::<_, Infallible>(hyper::service::service_fn(move |req: Request<Body>| {
let sni_name = sni_name.clone();
let conn_pool = conn_pool.clone();
async move {
let cancel_map = Arc::new(CancelMap::default());
let session_id = uuid::Uuid::new_v4();
async move {
let cancel_map = Arc::new(CancelMap::default());
let session_id = uuid::Uuid::new_v4();
ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name)
.instrument(info_span!(
"ws-client",
session = %session_id,
%peer_addr,
))
.await
}
},
)))
ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name)
.instrument(info_span!(
"ws-client",
session = %session_id,
%peer_addr,
))
.await
}
}))
}
},
);
@@ -315,41 +303,3 @@ pub async fn task_main(
Ok(())
}
struct MetricService<S> {
inner: S,
}
impl<S> MetricService<S> {
fn new(inner: S) -> MetricService<S> {
NUM_CLIENT_CONNECTION_OPENED_COUNTER
.with_label_values(&["http"])
.inc();
MetricService { inner }
}
}
impl<S> Drop for MetricService<S> {
fn drop(&mut self) {
NUM_CLIENT_CONNECTION_CLOSED_COUNTER
.with_label_values(&["http"])
.inc();
}
}
impl<S, ReqBody> hyper::service::Service<Request<ReqBody>> for MetricService<S>
where
S: hyper::service::Service<Request<ReqBody>>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
self.inner.call(req)
}
}

View File

@@ -7,7 +7,6 @@ use crate::{
compute::{self, PostgresConnection},
config::{ProxyConfig, TlsConfig},
console::{self, errors::WakeComputeError, messages::MetricsAuxInfo, Api},
http::StatusCode,
metrics::{Ids, USAGE_METRICS},
protocol2::WithClientIp,
stream::{PqStream, Stream},
@@ -39,55 +38,19 @@ const RETRY_WAIT_EXPONENT_BASE: f64 = std::f64::consts::SQRT_2;
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
const ERR_PROTO_VIOLATION: &str = "protocol violation";
pub static NUM_DB_CONNECTIONS_OPENED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_opened_db_connections_total",
"Number of opened connections to a database.",
&["protocol"],
)
.unwrap()
});
pub static NUM_DB_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_closed_db_connections_total",
"Number of closed connections to a database.",
&["protocol"],
)
.unwrap()
});
pub static NUM_CLIENT_CONNECTION_OPENED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_opened_client_connections_total",
"Number of opened connections from a client.",
&["protocol"],
)
.unwrap()
});
pub static NUM_CLIENT_CONNECTION_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_closed_client_connections_total",
"Number of closed connections from a client.",
&["protocol"],
)
.unwrap()
});
pub static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_accepted_connections_total",
"Number of client connections accepted.",
"Number of TCP client connections accepted.",
&["protocol"],
)
.unwrap()
});
pub static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_closed_connections_total",
"Number of client connections closed.",
"Number of TCP client connections closed.",
&["protocol"],
)
.unwrap()
@@ -112,15 +75,6 @@ static NUM_CONNECTION_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});
static NUM_WAKEUP_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_connection_failures_breakdown",
"Number of wake-up failures (per kind).",
&["retry", "kind"],
)
.unwrap()
});
static NUM_BYTES_PROXIED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_io_bytes_per_client",
@@ -254,16 +208,12 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
"handling interactive connection from client"
);
let proto = mode.protocol_label();
NUM_CLIENT_CONNECTION_OPENED_COUNTER
.with_label_values(&[proto])
.inc();
// The `closed` counter will increase when this future is destroyed.
NUM_CONNECTIONS_ACCEPTED_COUNTER
.with_label_values(&[proto])
.with_label_values(&[mode.protocol_label()])
.inc();
scopeguard::defer! {
NUM_CLIENT_CONNECTION_CLOSED_COUNTER.with_label_values(&[proto]).inc();
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc();
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[mode.protocol_label()]).inc();
}
let tls = config.tls_config.as_ref();
@@ -298,7 +248,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
mode.allow_self_signed_compute(config),
);
cancel_map
.with_session(|session| client.connect_to_db(session, mode))
.with_session(|session| client.connect_to_db(session, mode.allow_cleartext()))
.await
}
@@ -447,46 +397,6 @@ impl ConnectMechanism for TcpMechanism<'_> {
}
}
const fn bool_to_str(x: bool) -> &'static str {
if x {
"true"
} else {
"false"
}
}
fn report_error(e: &WakeComputeError, retry: bool) {
use crate::console::errors::ApiError;
let retry = bool_to_str(retry);
let kind = match e {
WakeComputeError::BadComputeAddress(_) => "bad_compute_address",
WakeComputeError::ApiError(ApiError::Transport(_)) => "api_transport_error",
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::LOCKED,
ref text,
}) if text.contains("written data quota exceeded")
|| text.contains("the limit for current plan reached") =>
{
"quota_exceeded"
}
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::LOCKED,
..
}) => "api_console_locked",
WakeComputeError::ApiError(ApiError::Console {
status: StatusCode::BAD_REQUEST,
..
}) => "api_console_bad_request",
WakeComputeError::ApiError(ApiError::Console { status, .. })
if status.is_server_error() =>
{
"api_console_other_server_error"
}
WakeComputeError::ApiError(ApiError::Console { .. }) => "api_console_other_error",
};
NUM_WAKEUP_FAILURES.with_label_values(&[retry, kind]).inc();
}
/// Try to connect to the compute node, retrying if necessary.
/// This function might update `node_info`, so we take it by `&mut`.
#[tracing::instrument(skip_all)]
@@ -530,12 +440,10 @@ where
match handle_try_wake(wake_res, num_retries) {
Err(e) => {
error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node");
report_error(&e, false);
return Err(e.into());
}
// failed to wake up but we can continue to retry
Ok(ControlFlow::Continue(e)) => {
report_error(&e, true);
warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node");
}
// successfully woke up a compute node and can break the wakeup loop
@@ -774,7 +682,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
async fn connect_to_db(
self,
session: cancellation::Session<'_>,
mode: ClientMode,
allow_cleartext: bool,
) -> anyhow::Result<()> {
let Self {
mut stream,
@@ -790,7 +698,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
};
let auth_result = match creds
.authenticate(&extra, &mut stream, mode.allow_cleartext())
.authenticate(&extra, &mut stream, allow_cleartext)
.await
{
Ok(auth_result) => auth_result,
@@ -816,14 +724,6 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
.or_else(|e| stream.throw_error(e))
.await?;
let proto = mode.protocol_label();
NUM_DB_CONNECTIONS_OPENED_COUNTER
.with_label_values(&[proto])
.inc();
scopeguard::defer! {
NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc();
}
prepare_client_connection(&node, reported_auth_ok, session, &mut stream).await?;
// Before proxy passing, forward to compute whatever data is left in the
// PqStream input buffer. Normally there is none, but our serverless npm

View File

@@ -374,12 +374,8 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
if conf.http_auth.is_some() {
router = router.middleware(auth_middleware(|request| {
#[allow(clippy::mutable_key_type)]
static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> = Lazy::new(|| {
["/v1/status", "/metrics"]
.iter()
.map(|v| v.parse().unwrap())
.collect()
});
static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> =
Lazy::new(|| ["/v1/status"].iter().map(|v| v.parse().unwrap()).collect());
if ALLOWLIST_ROUTES.contains(request.uri()) {
None
} else {

View File

@@ -65,7 +65,7 @@ def start_heavy_write_workload(env: PgCompare, n_tables: int, scale: int, num_it
def start_single_table_workload(table_id: int):
for _ in range(num_iters):
with env.pg.connect(options="-cstatement_timeout=300s").cursor() as cur:
with env.pg.connect().cursor() as cur:
cur.execute(
f"INSERT INTO t{table_id} SELECT FROM generate_series(1,{new_rows_each_update})"
)

View File

@@ -188,7 +188,7 @@ def test_sql_over_http(static_proxy: NeonProxy):
headers={"Content-Type": "application/sql", "Neon-Connection-String": connstr},
verify=str(static_proxy.test_output_dir / "proxy.crt"),
)
assert response.status_code == 200, response.text
assert response.status_code == 200
return response.json()
rows = q("select 42 as answer")["rows"]
@@ -206,12 +206,6 @@ def test_sql_over_http(static_proxy: NeonProxy):
rows = q("select $1::json->'a' as answer", [{"a": {"b": 42}}])["rows"]
assert rows == [{"answer": {"b": 42}}]
rows = q("select $1::jsonb[] as answer", [[{}]])["rows"]
assert rows == [{"answer": [{}]}]
rows = q("select $1::jsonb[] as answer", [[{"foo": 1}, {"bar": 2}]])["rows"]
assert rows == [{"answer": [{"foo": 1}, {"bar": 2}]}]
rows = q("select * from pg_class limit 1")["rows"]
assert len(rows) == 1

View File

@@ -6,7 +6,6 @@ from pathlib import Path
from typing import List, Optional
import asyncpg
import pytest
import toml
from fixtures.log_helper import getLogger
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder, Safekeeper
@@ -598,10 +597,7 @@ async def run_wal_lagging(env: NeonEnv, endpoint: Endpoint, test_output_dir: Pat
assert res == expected_sum
# Do inserts while restarting postgres and messing with safekeeper addresses.
# The test takes more than default 5 minutes on Postgres 16,
# see https://github.com/neondatabase/neon/issues/5305
@pytest.mark.timeout(600)
# do inserts while restarting postgres and messing with safekeeper addresses
def test_wal_lagging(neon_env_builder: NeonEnvBuilder, test_output_dir: Path):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()