Compare commits

..

1 Commits

Author SHA1 Message Date
Alex Chi Z
ab7e5fbf95 feat(pageserver): add PostHog config section
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-05-07 14:34:21 +08:00
47 changed files with 318 additions and 828 deletions

View File

@@ -348,7 +348,6 @@ async fn run_dump_restore(
"--no-security-labels".to_string(),
"--no-subscriptions".to_string(),
"--no-tablespaces".to_string(),
"--no-event-triggers".to_string(),
// format
"--format".to_string(),
"directory".to_string(),

View File

@@ -329,39 +329,11 @@ struct StartVmMonitorResult {
impl ComputeNode {
pub fn new(params: ComputeNodeParams, config: ComputeConfig) -> Result<Self> {
let connstr = params.connstr.as_str();
let mut conn_conf = postgres::config::Config::from_str(connstr)
let conn_conf = postgres::config::Config::from_str(connstr)
.context("cannot build postgres config from connstr")?;
let mut tokio_conn_conf = tokio_postgres::config::Config::from_str(connstr)
let tokio_conn_conf = tokio_postgres::config::Config::from_str(connstr)
.context("cannot build tokio postgres config from connstr")?;
// Users can set some configuration parameters per database with
// ALTER DATABASE ... SET ...
//
// There are at least these parameters:
//
// - role=some_other_role
// - default_transaction_read_only=on
// - statement_timeout=1, i.e., 1ms, which will cause most of the queries to fail
// - search_path=non_public_schema, this should be actually safe because
// we don't call any functions in user databases, but better to always reset
// it to public.
//
// that can affect `compute_ctl` and prevent it from properly configuring the database schema.
// Unset them via connection string options before connecting to the database.
// N.B. keep it in sync with `ZENITH_OPTIONS` in `get_maintenance_client()`.
//
// TODO(ololobus): we currently pass `-c default_transaction_read_only=off` from control plane
// as well. After rolling out this code, we can remove this parameter from control plane.
// In the meantime, double-passing is fine, the last value is applied.
// See: <https://github.com/neondatabase/cloud/blob/133dd8c4dbbba40edfbad475bf6a45073ca63faf/goapp/controlplane/internal/pkg/compute/provisioner/provisioner_common.go#L70>
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0";
let options = match conn_conf.get_options() {
Some(options) => format!("{} {}", options, EXTRA_OPTIONS),
None => EXTRA_OPTIONS.to_string(),
};
conn_conf.options(&options);
tokio_conn_conf.options(&options);
let mut new_state = ComputeState::new();
if let Some(spec) = config.spec {
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
@@ -1477,20 +1449,15 @@ impl ComputeNode {
Err(e) => match e.code() {
Some(&SqlState::INVALID_PASSWORD)
| Some(&SqlState::INVALID_AUTHORIZATION_SPECIFICATION) => {
// Connect with `zenith_admin` if `cloud_admin` could not authenticate
// Connect with zenith_admin if cloud_admin could not authenticate
info!(
"cannot connect to Postgres: {}, retrying with 'zenith_admin' username",
"cannot connect to postgres: {}, retrying with `zenith_admin` username",
e
);
let mut zenith_admin_conf = postgres::config::Config::from(conf.clone());
zenith_admin_conf.application_name("compute_ctl:apply_config");
zenith_admin_conf.user("zenith_admin");
// It doesn't matter what were the options before, here we just want
// to connect and create a new superuser role.
const ZENITH_OPTIONS: &str = "-c role=zenith_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0";
zenith_admin_conf.options(ZENITH_OPTIONS);
let mut client =
zenith_admin_conf.connect(NoTls)
.context("broken cloud_admin credential: tried connecting with cloud_admin but could not authenticate, and zenith_admin does not work either")?;
@@ -1656,7 +1623,9 @@ impl ComputeNode {
self.pg_reload_conf()?;
if spec.mode == ComputeMode::Primary {
let conf = self.get_tokio_conn_conf(Some("compute_ctl:reconfigure"));
let mut conf =
tokio_postgres::Config::from_str(self.params.connstr.as_str()).unwrap();
conf.application_name("apply_config");
let conf = Arc::new(conf);
let spec = Arc::new(spec.clone());

View File

@@ -1416,14 +1416,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
let pageserver_id = args.endpoint_pageserver_id;
let remote_ext_config = &args.remote_ext_config;
let default_generation = env
.storage_controller
.timelines_onto_safekeepers
.then_some(1);
let safekeepers_generation = args
.safekeepers_generation
.or(default_generation)
.map(SafekeeperGeneration::new);
let safekeepers_generation = args.safekeepers_generation.map(SafekeeperGeneration::new);
// If --safekeepers argument is given, use only the listed
// safekeeper nodes; otherwise all from the env.
let safekeepers = if let Some(safekeepers) = parse_safekeepers(&args.safekeepers)? {

View File

@@ -10,8 +10,7 @@ use camino::{Utf8Path, Utf8PathBuf};
use hyper0::Uri;
use nix::unistd::Pid;
use pageserver_api::controller_api::{
NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest,
SafekeeperSchedulingPolicyRequest, SkSchedulingPolicy, TenantCreateRequest,
NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest,
TenantCreateResponse, TenantLocateResponse,
};
use pageserver_api::models::{
@@ -21,7 +20,7 @@ use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use pem::Pem;
use postgres_backend::AuthType;
use reqwest::{Method, Response};
use reqwest::Method;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio::process::Command;
@@ -571,11 +570,6 @@ impl StorageController {
let peer_jwt_token = encode_from_key_file(&peer_claims, private_key)
.expect("failed to generate jwt token");
args.push(format!("--peer-jwt-token={peer_jwt_token}"));
let claims = Claims::new(None, Scope::SafekeeperData);
let jwt_token =
encode_from_key_file(&claims, private_key).expect("failed to generate jwt token");
args.push(format!("--safekeeper-jwt-token={jwt_token}"));
}
if let Some(public_key) = &self.public_key {
@@ -620,10 +614,6 @@ impl StorageController {
self.env.base_data_dir.display()
));
if self.env.safekeepers.iter().any(|sk| sk.auth_enabled) && self.private_key.is_none() {
anyhow::bail!("Safekeeper set up for auth but no private key specified");
}
if self.config.timelines_onto_safekeepers {
args.push("--timelines-onto-safekeepers".to_string());
}
@@ -650,10 +640,6 @@ impl StorageController {
)
.await?;
if self.config.timelines_onto_safekeepers {
self.register_safekeepers().await?;
}
Ok(())
}
@@ -757,23 +743,6 @@ impl StorageController {
where
RQ: Serialize + Sized,
RS: DeserializeOwned + Sized,
{
let response = self.dispatch_inner(method, path, body).await?;
Ok(response
.json()
.await
.map_err(pageserver_client::mgmt_api::Error::ReceiveBody)?)
}
/// Simple HTTP request wrapper for calling into storage controller
async fn dispatch_inner<RQ>(
&self,
method: reqwest::Method,
path: String,
body: Option<RQ>,
) -> anyhow::Result<Response>
where
RQ: Serialize + Sized,
{
// In the special case of the `storage_controller start` subcommand, we wish
// to use the API endpoint of the newly started storage controller in order
@@ -816,31 +785,10 @@ impl StorageController {
let response = builder.send().await?;
let response = response.error_from_body().await?;
Ok(response)
}
/// Register the safekeepers in the storage controller
#[instrument(skip(self))]
async fn register_safekeepers(&self) -> anyhow::Result<()> {
for sk in self.env.safekeepers.iter() {
let sk_id = sk.id;
let body = serde_json::json!({
"id": sk_id,
"created_at": "2023-10-25T09:11:25Z",
"updated_at": "2024-08-28T11:32:43Z",
"region_id": "aws-us-east-2",
"host": "127.0.0.1",
"port": sk.pg_port,
"http_port": sk.http_port,
"https_port": sk.https_port,
"version": 5957,
"availability_zone_id": format!("us-east-2b-{sk_id}"),
});
self.upsert_safekeeper(sk_id, body).await?;
self.safekeeper_scheduling_policy(sk_id, SkSchedulingPolicy::Active)
.await?;
}
Ok(())
Ok(response
.json()
.await
.map_err(pageserver_client::mgmt_api::Error::ReceiveBody)?)
}
/// Call into the attach_hook API, for use before handing out attachments to pageservers
@@ -868,42 +816,6 @@ impl StorageController {
Ok(response.generation)
}
#[instrument(skip(self))]
pub async fn upsert_safekeeper(
&self,
node_id: NodeId,
request: serde_json::Value,
) -> anyhow::Result<()> {
let resp = self
.dispatch_inner::<serde_json::Value>(
Method::POST,
format!("control/v1/safekeeper/{node_id}"),
Some(request),
)
.await?;
if !resp.status().is_success() {
anyhow::bail!(
"setting scheduling policy unsuccessful for safekeeper {node_id}: {}",
resp.status()
);
}
Ok(())
}
#[instrument(skip(self))]
pub async fn safekeeper_scheduling_policy(
&self,
node_id: NodeId,
scheduling_policy: SkSchedulingPolicy,
) -> anyhow::Result<()> {
self.dispatch::<SafekeeperSchedulingPolicyRequest, ()>(
Method::POST,
format!("control/v1/safekeeper/{node_id}/scheduling_policy"),
Some(SafekeeperSchedulingPolicyRequest { scheduling_policy }),
)
.await
}
#[instrument(skip(self))]
pub async fn inspect(
&self,

View File

@@ -43,6 +43,21 @@ pub struct NodeMetadata {
pub other: HashMap<String, serde_json::Value>,
}
/// PostHog integration config
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct PostHogConfig {
/// PostHog project ID
project_id: String,
/// Server-side (private) API key
server_api_key: String,
/// Client-side (public) API key
client_api_key: String,
/// Private API URL
private_api_url: String,
/// Public API URL
public_api_url: String,
}
/// `pageserver.toml`
///
/// We use serde derive with `#[serde(default)]` to generate a deserializer
@@ -182,7 +197,8 @@ pub struct ConfigToml {
pub tracing: Option<Tracing>,
pub enable_tls_page_service_api: bool,
pub dev_mode: bool,
pub timeline_import_config: TimelineImportConfig,
#[serde(skip_serializing_if = "Option::is_none")]
pub posthog_config: Option<PostHogConfig>,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -301,12 +317,6 @@ impl From<OtelExporterProtocol> for tracing_utils::Protocol {
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct TimelineImportConfig {
pub import_job_concurrency: NonZeroUsize,
pub import_job_soft_size_limit: NonZeroUsize,
}
pub mod statvfs {
pub mod mock {
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -666,10 +676,7 @@ impl Default for ConfigToml {
tracing: None,
enable_tls_page_service_api: false,
dev_mode: false,
timeline_import_config: TimelineImportConfig {
import_job_concurrency: NonZeroUsize::new(128).unwrap(),
import_job_soft_size_limit: NonZeroUsize::new(1024 * 1024 * 1024).unwrap(),
},
posthog_config: None,
}
}
}

View File

@@ -17,7 +17,7 @@ impl std::fmt::Display for RateLimitStats {
}
impl RateLimit {
pub const fn new(interval: Duration) -> Self {
pub fn new(interval: Duration) -> Self {
Self {
last: None,
interval,

View File

@@ -14,7 +14,7 @@ use std::time::Duration;
use anyhow::{Context, bail, ensure};
use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
use pageserver_api::config::{DiskUsageEvictionTaskConfig, MaxVectoredReadBytes};
use pageserver_api::config::{DiskUsageEvictionTaskConfig, MaxVectoredReadBytes, PostHogConfig};
use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId;
use pem::Pem;
@@ -231,7 +231,8 @@ pub struct PageServerConf {
/// This is insecure and should only be used in development environments.
pub dev_mode: bool,
pub timeline_import_config: pageserver_api::config::TimelineImportConfig,
/// PostHog integration config
pub posthog_config: Option<PostHogConfig>,
}
/// Token for authentication to safekeepers
@@ -406,7 +407,7 @@ impl PageServerConf {
tracing,
enable_tls_page_service_api,
dev_mode,
timeline_import_config,
posthog_config,
} = config_toml;
let mut conf = PageServerConf {
@@ -460,7 +461,6 @@ impl PageServerConf {
tracing,
enable_tls_page_service_api,
dev_mode,
timeline_import_config,
// ------------------------------------------------------------
// fields that require additional validation or custom handling
@@ -517,6 +517,7 @@ impl PageServerConf {
}
None => Vec::new(),
},
posthog_config,
};
// ------------------------------------------------------------

View File

@@ -1038,23 +1038,21 @@ impl PageServerHandler {
tracing::info_span!(
parent: &parent_span,
"handle_get_page_request",
request_id = %req.hdr.reqid,
rel = %req.rel,
blkno = %req.blkno,
req_lsn = %req.hdr.request_lsn,
not_modified_since_lsn = %req.hdr.not_modified_since,
not_modified_since_lsn = %req.hdr.not_modified_since
)
}};
($shard_id:expr) => {{
tracing::info_span!(
parent: &parent_span,
"handle_get_page_request",
request_id = %req.hdr.reqid,
rel = %req.rel,
blkno = %req.blkno,
req_lsn = %req.hdr.request_lsn,
not_modified_since_lsn = %req.hdr.not_modified_since,
shard_id = %$shard_id,
shard_id = %$shard_id
)
}};
}

View File

@@ -40,7 +40,7 @@ use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
use super::tenant::{PageReconstructError, Timeline};
use crate::aux_file;
use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder};
use crate::context::{PerfInstrumentFutureExt, RequestContext};
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::metrics::{
RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
@@ -275,30 +275,24 @@ impl Timeline {
continue;
}
let nblocks = {
let ctx = RequestContextBuilder::from(&ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"GET_REL_SIZE",
reltag=%tag,
lsn=%lsn,
)
})
.attached_child();
match self
.get_rel_size(*tag, Version::Lsn(lsn), &ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
.await
{
Ok(nblocks) => nblocks,
Err(err) => {
result_slots[response_slot_idx].write(Err(err));
slots_filled += 1;
continue;
}
let nblocks = match self
.get_rel_size(*tag, Version::Lsn(lsn), &ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"GET_REL_SIZE",
reltag=%tag,
lsn=%lsn,
)
})
.await
{
Ok(nblocks) => nblocks,
Err(err) => {
result_slots[response_slot_idx].write(Err(err));
slots_filled += 1;
continue;
}
};
@@ -314,17 +308,6 @@ impl Timeline {
let key = rel_block_to_key(*tag, *blknum);
let ctx = RequestContextBuilder::from(&ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"GET_BATCH",
batch_size = %page_count,
)
})
.attached_child();
let key_slots = keys_slots.entry(key).or_default();
key_slots.push((response_slot_idx, ctx));
@@ -340,7 +323,14 @@ impl Timeline {
let query = VersionedKeySpaceQuery::scattered(query);
let res = self
.get_vectored(query, io_concurrency, ctx)
.maybe_perf_instrument(ctx, |current_perf_span| current_perf_span.clone())
.maybe_perf_instrument(ctx, |current_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: current_perf_span,
"GET_BATCH",
batch_size = %page_count,
)
})
.await;
match res {

View File

@@ -94,23 +94,10 @@ impl Header {
pub enum WriteBlobError {
#[error(transparent)]
Flush(FlushTaskError),
#[error("blob too large ({len} bytes)")]
BlobTooLarge { len: usize },
#[error(transparent)]
Other(anyhow::Error),
}
impl WriteBlobError {
pub fn is_cancel(&self) -> bool {
match self {
WriteBlobError::Flush(e) => e.is_cancel(),
WriteBlobError::Other(_) => false,
}
}
pub fn into_anyhow(self) -> anyhow::Error {
match self {
WriteBlobError::Flush(e) => e.into_anyhow(),
WriteBlobError::Other(e) => e,
}
}
WriteBlobRaw(anyhow::Error),
}
impl BlockCursor<'_> {
@@ -340,9 +327,7 @@ where
return (
(
io_buf.slice_len(),
Err(WriteBlobError::Other(anyhow::anyhow!(
"blob too large ({len} bytes)"
))),
Err(WriteBlobError::BlobTooLarge { len }),
),
srcbuf,
);
@@ -406,7 +391,7 @@ where
// Verify the header, to ensure we don't write invalid/corrupt data.
let header = match Header::decode(&raw_with_header)
.context("decoding blob header")
.map_err(WriteBlobError::Other)
.map_err(WriteBlobError::WriteBlobRaw)
{
Ok(header) => header,
Err(err) => return (raw_with_header, Err(err)),
@@ -416,7 +401,7 @@ where
let raw_len = raw_with_header.len();
return (
raw_with_header,
Err(WriteBlobError::Other(anyhow::anyhow!(
Err(WriteBlobError::WriteBlobRaw(anyhow::anyhow!(
"header length mismatch: {header_total_len} != {raw_len}"
))),
);

View File

@@ -2,7 +2,6 @@
pub mod batch_split_writer;
pub mod delta_layer;
pub mod errors;
pub mod filter_iterator;
pub mod image_layer;
pub mod inmemory_layer;

View File

@@ -10,7 +10,6 @@ use utils::id::TimelineId;
use utils::lsn::Lsn;
use utils::shard::TenantShardId;
use super::errors::PutError;
use super::layer::S3_UPLOAD_LIMIT;
use super::{
DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
@@ -236,7 +235,7 @@ impl<'a> SplitImageLayerWriter<'a> {
key: Key,
img: Bytes,
ctx: &RequestContext,
) -> Result<(), PutError> {
) -> anyhow::Result<()> {
// The current estimation is an upper bound of the space that the key/image could take
// because we did not consider compression in this estimation. The resulting image layer
// could be smaller than the target size.
@@ -254,8 +253,7 @@ impl<'a> SplitImageLayerWriter<'a> {
self.cancel.clone(),
ctx,
)
.await
.map_err(PutError::Other)?;
.await?;
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
self.batches.add_unfinished_image_writer(
prev_image_writer,
@@ -348,7 +346,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
lsn: Lsn,
val: Value,
ctx: &RequestContext,
) -> Result<(), PutError> {
) -> anyhow::Result<()> {
// The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
// number, and therefore the final layer size could be a little bit larger or smaller than the target.
//
@@ -368,8 +366,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
self.cancel.clone(),
ctx,
)
.await
.map_err(PutError::Other)?,
.await?,
));
}
let (_, inner) = self.inner.as_mut().unwrap();
@@ -389,8 +386,7 @@ impl<'a> SplitDeltaLayerWriter<'a> {
self.cancel.clone(),
ctx,
)
.await
.map_err(PutError::Other)?;
.await?;
let (start_key, prev_delta_writer) =
self.inner.replace((key, next_delta_writer)).unwrap();
self.batches.add_unfinished_delta_writer(
@@ -400,11 +396,11 @@ impl<'a> SplitDeltaLayerWriter<'a> {
);
} else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
// We have to produce a very large file b/c a key is updated too often.
return Err(PutError::Other(anyhow::anyhow!(
anyhow::bail!(
"a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
key,
inner.estimated_size()
)));
);
}
}
self.last_key_written = key;

View File

@@ -55,7 +55,6 @@ use utils::bin_ser::SerializeError;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use super::errors::PutError;
use super::{
AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer,
ValuesReconstructState,
@@ -478,15 +477,12 @@ impl DeltaLayerWriterInner {
lsn: Lsn,
val: Value,
ctx: &RequestContext,
) -> Result<(), PutError> {
) -> anyhow::Result<()> {
let (_, res) = self
.put_value_bytes(
key,
lsn,
Value::ser(&val)
.map_err(anyhow::Error::new)
.map_err(PutError::Other)?
.slice_len(),
Value::ser(&val)?.slice_len(),
val.will_init(),
ctx,
)
@@ -501,7 +497,7 @@ impl DeltaLayerWriterInner {
val: FullSlice<Buf>,
will_init: bool,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<(), PutError>)
) -> (FullSlice<Buf>, anyhow::Result<()>)
where
Buf: IoBuf + Send,
{
@@ -517,24 +513,19 @@ impl DeltaLayerWriterInner {
.blob_writer
.write_blob_maybe_compressed(val, ctx, compression)
.await;
let res = res.map_err(PutError::WriteBlob);
let off = match res {
Ok((off, _)) => off,
Err(e) => return (val, Err(e)),
Err(e) => return (val, Err(anyhow::anyhow!(e))),
};
let blob_ref = BlobRef::new(off, will_init);
let delta_key = DeltaKey::from_key_lsn(&key, lsn);
let res = self
.tree
.append(&delta_key.0, blob_ref.0)
.map_err(anyhow::Error::new)
.map_err(PutError::Other);
let res = self.tree.append(&delta_key.0, blob_ref.0);
self.num_keys += 1;
(val, res)
(val, res.map_err(|e| anyhow::anyhow!(e)))
}
fn size(&self) -> u64 {
@@ -703,7 +694,7 @@ impl DeltaLayerWriter {
lsn: Lsn,
val: Value,
ctx: &RequestContext,
) -> Result<(), PutError> {
) -> anyhow::Result<()> {
self.inner
.as_mut()
.unwrap()
@@ -718,7 +709,7 @@ impl DeltaLayerWriter {
val: FullSlice<Buf>,
will_init: bool,
ctx: &RequestContext,
) -> (FullSlice<Buf>, Result<(), PutError>)
) -> (FullSlice<Buf>, anyhow::Result<()>)
where
Buf: IoBuf + Send,
{

View File

@@ -1,24 +0,0 @@
use crate::tenant::blob_io::WriteBlobError;
#[derive(Debug, thiserror::Error)]
pub enum PutError {
#[error(transparent)]
WriteBlob(WriteBlobError),
#[error(transparent)]
Other(anyhow::Error),
}
impl PutError {
pub fn is_cancel(&self) -> bool {
match self {
PutError::WriteBlob(e) => e.is_cancel(),
PutError::Other(_) => false,
}
}
pub fn into_anyhow(self) -> anyhow::Error {
match self {
PutError::WriteBlob(e) => e.into_anyhow(),
PutError::Other(e) => e,
}
}
}

View File

@@ -53,7 +53,6 @@ use utils::bin_ser::SerializeError;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use super::errors::PutError;
use super::layer_name::ImageLayerName;
use super::{
AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer,
@@ -843,14 +842,8 @@ impl ImageLayerWriterInner {
key: Key,
img: Bytes,
ctx: &RequestContext,
) -> Result<(), PutError> {
if !self.key_range.contains(&key) {
return Err(PutError::Other(anyhow::anyhow!(
"key {:?} not in range {:?}",
key,
self.key_range
)));
}
) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
let compression = self.conf.image_compression;
let uncompressed_len = img.len() as u64;
self.uncompressed_bytes += uncompressed_len;
@@ -860,7 +853,7 @@ impl ImageLayerWriterInner {
.write_blob_maybe_compressed(img.slice_len(), ctx, compression)
.await;
// TODO: re-use the buffer for `img` further upstack
let (off, compression_info) = res.map_err(PutError::WriteBlob)?;
let (off, compression_info) = res?;
if compression_info.compressed_size.is_some() {
// The image has been considered for compression at least
self.uncompressed_bytes_eligible += uncompressed_len;
@@ -872,10 +865,7 @@ impl ImageLayerWriterInner {
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
self.tree
.append(&keybuf, off)
.map_err(anyhow::Error::new)
.map_err(PutError::Other)?;
self.tree.append(&keybuf, off)?;
#[cfg(feature = "testing")]
{
@@ -1095,7 +1085,7 @@ impl ImageLayerWriter {
key: Key,
img: Bytes,
ctx: &RequestContext,
) -> Result<(), PutError> {
) -> anyhow::Result<()> {
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
}

View File

@@ -23,7 +23,7 @@ use super::{
LayerVisibilityHint, PerfInstrumentFutureExt, PersistentLayerDesc, ValuesReconstructState,
};
use crate::config::PageServerConf;
use crate::context::{RequestContext, RequestContextBuilder};
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::task_mgr::TaskKind;
use crate::tenant::Timeline;
@@ -1076,17 +1076,24 @@ impl LayerInner {
return Err(DownloadError::DownloadRequired);
}
let ctx = RequestContextBuilder::from(ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"DOWNLOAD_LAYER",
layer = %self,
reason = %reason,
)
})
.attached_child();
let ctx = if ctx.has_perf_span() {
let dl_ctx = RequestContextBuilder::from(ctx)
.task_kind(TaskKind::LayerDownload)
.download_behavior(DownloadBehavior::Download)
.root_perf_span(|| {
info_span!(
target: PERF_TRACE_TARGET,
"DOWNLOAD_LAYER",
layer = %self,
reason = %reason
)
})
.detached_child();
ctx.perf_follows_from(&dl_ctx);
dl_ctx
} else {
ctx.attached_child()
};
async move {
tracing::info!(%reason, "downloading on-demand");
@@ -1094,7 +1101,7 @@ impl LayerInner {
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());
let res = self
.download_init_and_wait(timeline, permit, ctx.attached_child())
.maybe_perf_instrument(&ctx, |current_perf_span| current_perf_span.clone())
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
.await?;
scopeguard::ScopeGuard::into_inner(init_cancelled);
@@ -1702,7 +1709,7 @@ impl DownloadError {
}
}
#[derive(Debug, PartialEq, Copy, Clone)]
#[derive(Debug, PartialEq)]
pub(crate) enum NeedsDownload {
NotFound,
NotFile(std::fs::FileType),

View File

@@ -340,7 +340,7 @@ pub(crate) fn log_compaction_error(
} else {
match level {
Level::ERROR if degrade_to_warning => warn!("Compaction failed and discarded: {err:#}"),
Level::ERROR => error!("Compaction failed: {err:?}"),
Level::ERROR => error!("Compaction failed: {err:#}"),
Level::INFO => info!("Compaction failed: {err:#}"),
level => unimplemented!("unexpected level {level:?}"),
}

View File

@@ -987,16 +987,6 @@ impl From<PageReconstructError> for CreateImageLayersError {
}
}
impl From<super::storage_layer::errors::PutError> for CreateImageLayersError {
fn from(e: super::storage_layer::errors::PutError) -> Self {
if e.is_cancel() {
CreateImageLayersError::Cancelled
} else {
CreateImageLayersError::Other(e.into_anyhow())
}
}
}
impl From<GetVectoredError> for CreateImageLayersError {
fn from(e: GetVectoredError) -> Self {
match e {
@@ -2127,14 +2117,22 @@ impl Timeline {
debug_assert_current_span_has_tenant_and_timeline_id();
// Regardless of whether we're going to try_freeze_and_flush
// or not, stop ingesting any more data.
// or not, stop ingesting any more data. Walreceiver only provides
// cancellation but no "wait until gone", because it uses the Timeline::gate.
// So, only after the self.gate.close() below will we know for sure that
// no walreceiver tasks are left.
// For `try_freeze_and_flush=true`, this means that we might still be ingesting
// data during the call to `self.freeze_and_flush()` below.
// That's not ideal, but, we don't have the concept of a ChildGuard,
// which is what we'd need to properly model early shutdown of the walreceiver
// task sub-tree before the other Timeline task sub-trees.
let walreceiver = self.walreceiver.lock().unwrap().take();
tracing::debug!(
is_some = walreceiver.is_some(),
"Waiting for WalReceiverManager..."
);
if let Some(walreceiver) = walreceiver {
walreceiver.shutdown().await;
walreceiver.cancel();
}
// ... and inform any waiters for newer LSNs that there won't be any.
self.last_record_lsn.shutdown();
@@ -5925,16 +5923,6 @@ impl From<layer_manager::Shutdown> for CompactionError {
}
}
impl From<super::storage_layer::errors::PutError> for CompactionError {
fn from(e: super::storage_layer::errors::PutError) -> Self {
if e.is_cancel() {
CompactionError::ShuttingDown
} else {
CompactionError::Other(e.into_anyhow())
}
}
}
#[serde_as]
#[derive(serde::Serialize)]
struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);

View File

@@ -2204,7 +2204,8 @@ impl Timeline {
.as_mut()
.unwrap()
.put_value(key, lsn, value, ctx)
.await?;
.await
.map_err(CompactionError::Other)?;
} else {
let owner = self.shard_identity.get_shard_number(&key);

View File

@@ -149,7 +149,14 @@ pub async fn doit(
}
.await?;
flow::run(timeline.clone(), control_file, storage.clone(), ctx).await?;
flow::run(
timeline.clone(),
base_lsn,
control_file,
storage.clone(),
ctx,
)
.await?;
//
// Communicate that shard is done.

View File

@@ -34,9 +34,7 @@ use std::sync::Arc;
use anyhow::{bail, ensure};
use bytes::Bytes;
use futures::stream::FuturesOrdered;
use itertools::Itertools;
use pageserver_api::config::TimelineImportConfig;
use pageserver_api::key::{
CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, Key, TWOPHASEDIR_KEY, rel_block_to_key,
rel_dir_to_key, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
@@ -48,9 +46,8 @@ use pageserver_api::shard::ShardIdentity;
use postgres_ffi::relfile_utils::parse_relfilename;
use postgres_ffi::{BLCKSZ, pg_constants};
use remote_storage::RemotePath;
use tokio::sync::Semaphore;
use tokio_stream::StreamExt;
use tracing::{debug, instrument};
use tokio::task::JoinSet;
use tracing::{Instrument, debug, info_span, instrument};
use utils::bin_ser::BeSer;
use utils::lsn::Lsn;
@@ -66,40 +63,38 @@ use crate::tenant::storage_layer::{ImageLayerWriter, Layer};
pub async fn run(
timeline: Arc<Timeline>,
pgdata_lsn: Lsn,
control_file: ControlFile,
storage: RemoteStorageWrapper,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let planner = Planner {
Flow {
timeline,
pgdata_lsn,
control_file,
storage: storage.clone(),
shard: timeline.shard_identity,
tasks: Vec::default(),
};
let import_config = &timeline.conf.timeline_import_config;
let plan = planner.plan(import_config).await?;
plan.execute(timeline, import_config, ctx).await
tasks: Vec::new(),
storage,
}
.run(ctx)
.await
}
struct Planner {
struct Flow {
timeline: Arc<Timeline>,
pgdata_lsn: Lsn,
control_file: ControlFile,
storage: RemoteStorageWrapper,
shard: ShardIdentity,
tasks: Vec<AnyImportTask>,
storage: RemoteStorageWrapper,
}
struct Plan {
jobs: Vec<ChunkProcessingJob>,
}
impl Planner {
/// Creates an import plan
///
/// This function is and must remain pure: given the same input, it will generate the same import plan.
async fn plan(mut self, import_config: &TimelineImportConfig) -> anyhow::Result<Plan> {
impl Flow {
/// Perform the ingestion into [`Self::timeline`].
/// Assumes the timeline is empty (= no layers).
pub async fn run(mut self, ctx: &RequestContext) -> anyhow::Result<()> {
let pgdata_lsn = Lsn(self.control_file.control_file_data().checkPoint).align();
self.pgdata_lsn = pgdata_lsn;
let datadir = PgDataDir::new(&self.storage).await?;
// Import dbdir (00:00:00 keyspace)
@@ -120,7 +115,7 @@ impl Planner {
}
// Import SLRUs
if self.shard.is_shard_zero() {
if self.timeline.tenant_shard_id.is_shard_zero() {
// pg_xact (01:00 keyspace)
self.import_slru(SlruKind::Clog, &self.storage.pgdata().join("pg_xact"))
.await?;
@@ -171,16 +166,14 @@ impl Planner {
let mut last_end_key = Key::MIN;
let mut current_chunk = Vec::new();
let mut current_chunk_size: usize = 0;
let mut jobs = Vec::new();
let mut parallel_jobs = Vec::new();
for task in std::mem::take(&mut self.tasks).into_iter() {
if current_chunk_size + task.total_size()
> import_config.import_job_soft_size_limit.into()
{
if current_chunk_size + task.total_size() > 1024 * 1024 * 1024 {
let key_range = last_end_key..task.key_range().start;
jobs.push(ChunkProcessingJob::new(
parallel_jobs.push(ChunkProcessingJob::new(
key_range.clone(),
std::mem::take(&mut current_chunk),
pgdata_lsn,
&self,
));
last_end_key = key_range.end;
current_chunk_size = 0;
@@ -188,13 +181,45 @@ impl Planner {
current_chunk_size += task.total_size();
current_chunk.push(task);
}
jobs.push(ChunkProcessingJob::new(
parallel_jobs.push(ChunkProcessingJob::new(
last_end_key..Key::MAX,
current_chunk,
pgdata_lsn,
&self,
));
Ok(Plan { jobs })
// Start all jobs simultaneosly
let mut work = JoinSet::new();
// TODO: semaphore?
for job in parallel_jobs {
let ctx: RequestContext =
ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Error);
work.spawn(async move { job.run(&ctx).await }.instrument(info_span!("parallel_job")));
}
let mut results = Vec::new();
while let Some(result) = work.join_next().await {
match result {
Ok(res) => {
results.push(res);
}
Err(_joinset_err) => {
results.push(Err(anyhow::anyhow!(
"parallel job panicked or cancelled, check pageserver logs"
)));
}
}
}
if results.iter().all(|r| r.is_ok()) {
Ok(())
} else {
let mut msg = String::new();
for result in results {
if let Err(err) = result {
msg.push_str(&format!("{err:?}\n\n"));
}
}
bail!("Some parallel jobs failed:\n\n{msg}");
}
}
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(dboid=%db.dboid, tablespace=%db.spcnode, path=%db.path))]
@@ -241,7 +266,7 @@ impl Planner {
let end_key = rel_block_to_key(file.rel_tag, start_blk + (len / 8192) as u32);
self.tasks
.push(AnyImportTask::RelBlocks(ImportRelBlocksTask::new(
self.shard,
*self.timeline.get_shard_identity(),
start_key..end_key,
&file.path,
self.storage.clone(),
@@ -264,7 +289,7 @@ impl Planner {
}
async fn import_slru(&mut self, kind: SlruKind, path: &RemotePath) -> anyhow::Result<()> {
assert!(self.shard.is_shard_zero());
assert!(self.timeline.tenant_shard_id.is_shard_zero());
let segments = self.storage.listfilesindir(path).await?;
let segments: Vec<(String, u32, usize)> = segments
@@ -319,68 +344,6 @@ impl Planner {
}
}
impl Plan {
async fn execute(
self,
timeline: Arc<Timeline>,
import_config: &TimelineImportConfig,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut work = FuturesOrdered::new();
let semaphore = Arc::new(Semaphore::new(import_config.import_job_concurrency.into()));
let jobs_in_plan = self.jobs.len();
let mut jobs = self.jobs.into_iter().enumerate().peekable();
let mut results = Vec::new();
// Run import jobs concurrently up to the limit specified by the pageserver configuration.
// Note that we process completed futures in the oreder of insertion. This will be the
// building block for resuming imports across pageserver restarts or tenant migrations.
while results.len() < jobs_in_plan {
tokio::select! {
permit = semaphore.clone().acquire_owned(), if jobs.peek().is_some() => {
let permit = permit.expect("never closed");
let (job_idx, job) = jobs.next().expect("we peeked");
let job_timeline = timeline.clone();
let ctx = ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Error);
work.push_back(tokio::task::spawn(async move {
let _permit = permit;
let res = job.run(job_timeline, &ctx).await;
(job_idx, res)
}));
},
maybe_complete_job_idx = work.next() => {
match maybe_complete_job_idx {
Some(Ok((_job_idx, res))) => {
results.push(res);
},
Some(Err(_)) => {
results.push(Err(anyhow::anyhow!(
"parallel job panicked or cancelled, check pageserver logs"
)));
}
None => {}
}
}
}
}
if results.iter().all(|r| r.is_ok()) {
Ok(())
} else {
let mut msg = String::new();
for result in results {
if let Err(err) = result {
msg.push_str(&format!("{err:?}\n\n"));
}
}
bail!("Some parallel jobs failed:\n\n{msg}");
}
}
}
//
// dbdir iteration tools
//
@@ -750,6 +713,7 @@ impl From<ImportSlruBlocksTask> for AnyImportTask {
}
struct ChunkProcessingJob {
timeline: Arc<Timeline>,
range: Range<Key>,
tasks: Vec<AnyImportTask>,
@@ -757,24 +721,25 @@ struct ChunkProcessingJob {
}
impl ChunkProcessingJob {
fn new(range: Range<Key>, tasks: Vec<AnyImportTask>, pgdata_lsn: Lsn) -> Self {
assert!(pgdata_lsn.is_valid());
fn new(range: Range<Key>, tasks: Vec<AnyImportTask>, env: &Flow) -> Self {
assert!(env.pgdata_lsn.is_valid());
Self {
timeline: env.timeline.clone(),
range,
tasks,
pgdata_lsn,
pgdata_lsn: env.pgdata_lsn,
}
}
async fn run(self, timeline: Arc<Timeline>, ctx: &RequestContext) -> anyhow::Result<()> {
async fn run(self, ctx: &RequestContext) -> anyhow::Result<()> {
let mut writer = ImageLayerWriter::new(
timeline.conf,
timeline.timeline_id,
timeline.tenant_shard_id,
self.timeline.conf,
self.timeline.timeline_id,
self.timeline.tenant_shard_id,
&self.range,
self.pgdata_lsn,
&timeline.gate,
timeline.cancel.clone(),
&self.timeline.gate,
self.timeline.cancel.clone(),
ctx,
)
.await?;
@@ -786,20 +751,24 @@ impl ChunkProcessingJob {
let resident_layer = if nimages > 0 {
let (desc, path) = writer.finish(ctx).await?;
Layer::finish_creating(timeline.conf, &timeline, desc, &path)?
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?
} else {
// dropping the writer cleans up
return Ok(());
};
// this is sharing the same code as create_image_layers
let mut guard = timeline.layers.write().await;
let mut guard = self.timeline.layers.write().await;
guard
.open_mut()?
.track_new_image_layers(&[resident_layer.clone()], &timeline.metrics);
.track_new_image_layers(&[resident_layer.clone()], &self.timeline.metrics);
crate::tenant::timeline::drop_wlock(guard);
timeline
// Schedule the layer for upload but don't add barriers such as
// wait for completion or index upload, so we don't inhibit upload parallelism.
// TODO: limit upload parallelism somehow (e.g. by limiting concurrency of jobs?)
// TODO: or regulate parallelism by upload queue depth? Prob should happen at a higher level.
self.timeline
.remote_client
.schedule_layer_file_upload(resident_layer)?;

View File

@@ -63,7 +63,6 @@ pub struct WalReceiver {
/// All task spawned by [`WalReceiver::start`] and its children are sensitive to this token.
/// It's a child token of [`Timeline`] so that timeline shutdown can cancel WalReceiver tasks early for `freeze_and_flush=true`.
cancel: CancellationToken,
task: tokio::task::JoinHandle<()>,
}
impl WalReceiver {
@@ -80,7 +79,7 @@ impl WalReceiver {
let loop_status = Arc::new(std::sync::RwLock::new(None));
let manager_status = Arc::clone(&loop_status);
let cancel = timeline.cancel.child_token();
let task = WALRECEIVER_RUNTIME.spawn({
WALRECEIVER_RUNTIME.spawn({
let cancel = cancel.clone();
async move {
debug_assert_current_span_has_tenant_and_timeline_id();
@@ -121,25 +120,14 @@ impl WalReceiver {
Self {
manager_status,
cancel,
task,
}
}
#[instrument(skip_all, level = tracing::Level::DEBUG)]
pub async fn shutdown(self) {
pub fn cancel(&self) {
debug_assert_current_span_has_tenant_and_timeline_id();
debug!("cancelling walreceiver tasks");
self.cancel.cancel();
match self.task.await {
Ok(()) => debug!("Shutdown success"),
Err(je) if je.is_cancelled() => unreachable!("not used"),
Err(je) if je.is_panic() => {
// already logged by panic hook
}
Err(je) => {
error!("shutdown walreceiver task join error: {je}")
}
}
}
pub(crate) fn status(&self) -> Option<ConnectionManagerStatus> {

View File

@@ -111,17 +111,13 @@ pub(crate) fn get() -> IoEngine {
use std::os::unix::prelude::FileExt;
use std::sync::atomic::{AtomicU8, Ordering};
#[cfg(target_os = "linux")]
use {std::time::Duration, tracing::info};
use super::owned_buffers_io::io_buf_ext::FullSlice;
use super::owned_buffers_io::slice::SliceMutExt;
use super::{FileGuard, Metadata};
#[cfg(target_os = "linux")]
pub(super) fn epoll_uring_error_to_std(
e: tokio_epoll_uring::Error<std::io::Error>,
) -> std::io::Error {
fn epoll_uring_error_to_std(e: tokio_epoll_uring::Error<std::io::Error>) -> std::io::Error {
match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {
@@ -153,11 +149,7 @@ impl IoEngine {
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) =
retry_ecanceled_once((file_guard, slice), |(file_guard, slice)| async {
system.read(file_guard, offset, slice).await
})
.await;
let (resources, res) = system.read(file_guard, offset, slice).await;
(resources, res.map_err(epoll_uring_error_to_std))
}
}
@@ -172,10 +164,7 @@ impl IoEngine {
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = retry_ecanceled_once(file_guard, |file_guard| async {
system.fsync(file_guard).await
})
.await;
let (resources, res) = system.fsync(file_guard).await;
(resources, res.map_err(epoll_uring_error_to_std))
}
}
@@ -193,10 +182,7 @@ impl IoEngine {
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = retry_ecanceled_once(file_guard, |file_guard| async {
system.fdatasync(file_guard).await
})
.await;
let (resources, res) = system.fdatasync(file_guard).await;
(resources, res.map_err(epoll_uring_error_to_std))
}
}
@@ -215,10 +201,7 @@ impl IoEngine {
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring_ext::thread_local_system().await;
let (resources, res) = retry_ecanceled_once(file_guard, |file_guard| async {
system.statx(file_guard).await
})
.await;
let (resources, res) = system.statx(file_guard).await;
(
resources,
res.map_err(epoll_uring_error_to_std).map(Metadata::from),
@@ -241,7 +224,6 @@ impl IoEngine {
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
// TODO: ftruncate op for tokio-epoll-uring
// Don't forget to use retry_ecanceled_once
let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
(file_guard, res)
}
@@ -263,11 +245,8 @@ impl IoEngine {
#[cfg(target_os = "linux")]
IoEngine::TokioEpollUring => {
let system = tokio_epoll_uring_ext::thread_local_system().await;
let ((file_guard, slice), res) = retry_ecanceled_once(
(file_guard, buf.into_raw_slice()),
async |(file_guard, buf)| system.write(file_guard, offset, buf).await,
)
.await;
let ((file_guard, slice), res) =
system.write(file_guard, offset, buf.into_raw_slice()).await;
(
(file_guard, FullSlice::must_new(slice)),
res.map_err(epoll_uring_error_to_std),
@@ -303,56 +282,6 @@ impl IoEngine {
}
}
/// We observe in tests that stop pageserver with SIGTERM immediately after it was ingesting data,
/// occasionally buffered writers fail (and get retried by BufferedWriter) with ECANCELED.
/// The problem is believed to be a race condition in how io_uring handles punted async work (io-wq) and signals.
/// Investigation ticket: <https://github.com/neondatabase/neon/issues/11446>
///
/// This function retries the operation once if it fails with ECANCELED.
/// ONLY USE FOR IDEMPOTENT [`super::VirtualFile`] operations.
#[cfg(target_os = "linux")]
pub(super) async fn retry_ecanceled_once<F, Fut, T, V>(
resources: T,
f: F,
) -> (T, Result<V, tokio_epoll_uring::Error<std::io::Error>>)
where
F: Fn(T) -> Fut,
Fut: std::future::Future<Output = (T, Result<V, tokio_epoll_uring::Error<std::io::Error>>)>,
T: Send,
V: Send,
{
let (resources, res) = f(resources).await;
let Err(e) = res else {
return (resources, res);
};
let tokio_epoll_uring::Error::Op(err) = e else {
return (resources, Err(e));
};
if err.raw_os_error() != Some(nix::libc::ECANCELED) {
return (resources, Err(tokio_epoll_uring::Error::Op(err)));
}
{
static RATE_LIMIT: std::sync::Mutex<utils::rate_limit::RateLimit> =
std::sync::Mutex::new(utils::rate_limit::RateLimit::new(Duration::from_secs(1)));
let mut guard = RATE_LIMIT.lock().unwrap();
guard.call2(|rate_limit_stats| {
info!(
%rate_limit_stats, "ECANCELED observed, assuming it is due to a signal being received by the submitting thread, retrying after a delay; this message is rate-limited"
);
});
drop(guard);
}
tokio::time::sleep(Duration::from_millis(100)).await; // something big enough to beat even heavily overcommitted CI runners
let (resources, res) = f(resources).await;
(resources, res)
}
pub(super) fn panic_operation_must_be_idempotent() {
panic!(
"unsupported; io_engine may retry operations internally and thus needs them to be idempotent (retry_ecanceled_once)"
)
}
pub enum FeatureTestResult {
PlatformPreferred(IoEngineKind),
Worse {

View File

@@ -110,23 +110,18 @@ impl OpenOptions {
self
}
/// Don't use, `O_APPEND` is not supported.
pub fn append(&mut self, _append: bool) {
super::io_engine::panic_operation_must_be_idempotent();
}
pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result<OwnedFd> {
match &self.inner {
Inner::StdFs(x) => x.open(path).map(|file| file.into()),
#[cfg(target_os = "linux")]
Inner::TokioEpollUring(x) => {
let system = super::io_engine::tokio_epoll_uring_ext::thread_local_system().await;
let (_, res) = super::io_engine::retry_ecanceled_once((), |()| async {
let res = system.open(path, x).await;
((), res)
system.open(path, x).await.map_err(|e| match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {
std::io::Error::new(std::io::ErrorKind::Other, system)
}
})
.await;
res.map_err(super::io_engine::epoll_uring_error_to_std)
}
}
}
@@ -145,9 +140,6 @@ impl OpenOptions {
}
pub fn custom_flags(mut self, flags: i32) -> Self {
if flags & nix::libc::O_APPEND != 0 {
super::io_engine::panic_operation_must_be_idempotent();
}
match &mut self.inner {
Inner::StdFs(x) => {
let _ = x.custom_flags(flags);

View File

@@ -247,19 +247,6 @@ pub enum FlushTaskError {
Cancelled,
}
impl FlushTaskError {
pub fn is_cancel(&self) -> bool {
match self {
FlushTaskError::Cancelled => true,
}
}
pub fn into_anyhow(self) -> anyhow::Error {
match self {
FlushTaskError::Cancelled => anyhow::anyhow!(self),
}
}
}
impl<Buf, W> FlushBackgroundTask<Buf, W>
where
Buf: IoBufAligned + Send + Sync,

View File

@@ -12,9 +12,9 @@ use tracing::{debug, warn};
use crate::auth::password_hack::parse_endpoint_param;
use crate::context::RequestContext;
use crate::error::{ReportableError, UserFacingError};
use crate::metrics::{Metrics, SniGroup, SniKind};
use crate::metrics::{Metrics, SniKind};
use crate::proxy::NeonOptions;
use crate::serverless::{AUTH_BROKER_SNI, SERVERLESS_DRIVER_SNI};
use crate::serverless::SERVERLESS_DRIVER_SNI;
use crate::types::{EndpointId, RoleName};
#[derive(Debug, Error, PartialEq, Eq, Clone)]
@@ -65,7 +65,7 @@ pub(crate) fn endpoint_sni(sni: &str, common_names: &HashSet<String>) -> Option<
if !common_names.contains(common_name) {
return None;
}
if subdomain == SERVERLESS_DRIVER_SNI || subdomain == AUTH_BROKER_SNI {
if subdomain == SERVERLESS_DRIVER_SNI {
return None;
}
Some(EndpointId::from(subdomain))
@@ -128,23 +128,22 @@ impl ComputeUserInfoMaybeEndpoint {
let metrics = Metrics::get();
debug!(%user, "credentials");
let protocol = ctx.protocol();
let kind = if sni.is_some() {
if sni.is_some() {
debug!("Connection with sni");
SniKind::Sni
metrics.proxy.accepted_connections_by_sni.inc(SniKind::Sni);
} else if endpoint.is_some() {
metrics
.proxy
.accepted_connections_by_sni
.inc(SniKind::NoSni);
debug!("Connection without sni");
SniKind::NoSni
} else {
metrics
.proxy
.accepted_connections_by_sni
.inc(SniKind::PasswordHack);
debug!("Connection with password hack");
SniKind::PasswordHack
};
metrics
.proxy
.accepted_connections_by_sni
.inc(SniGroup { protocol, kind });
}
let options = NeonOptions::parse_params(params);

View File

@@ -115,8 +115,8 @@ pub struct ProxyMetrics {
#[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
pub allowed_vpc_endpoint_ids: Histogram<10>,
/// Number of connections, by the method we used to determine the endpoint.
pub accepted_connections_by_sni: CounterVec<SniSet>,
/// Number of connections (per sni).
pub accepted_connections_by_sni: CounterVec<StaticLabelSet<SniKind>>,
/// Number of connection failures (per kind).
pub connection_failures_total: CounterVec<StaticLabelSet<ConnectionFailureKind>>,
@@ -342,20 +342,11 @@ pub enum LatencyExclusions {
ClientCplaneComputeRetry,
}
#[derive(LabelGroup)]
#[label(set = SniSet)]
pub struct SniGroup {
pub protocol: Protocol,
pub kind: SniKind,
}
#[derive(FixedCardinalityLabel, Copy, Clone)]
#[label(singleton = "kind")]
pub enum SniKind {
/// Domain name based routing. SNI for libpq/websockets. Host for HTTP
Sni,
/// Metadata based routing. `options` for libpq/websockets. Header for HTTP
NoSni,
/// Metadata based routing, using the password field.
PasswordHack,
}

View File

@@ -56,7 +56,6 @@ use crate::serverless::backend::PoolingBackend;
use crate::serverless::http_util::{api_error_into_response, json_response};
pub(crate) const SERVERLESS_DRIVER_SNI: &str = "api";
pub(crate) const AUTH_BROKER_SNI: &str = "apiauth";
pub async fn task_main(
config: &'static ProxyConfig,

View File

@@ -38,7 +38,7 @@ use crate::config::{AuthenticationConfig, HttpConfig, ProxyConfig, TlsConfig};
use crate::context::RequestContext;
use crate::error::{ErrorKind, ReportableError, UserFacingError};
use crate::http::{ReadBodyError, read_body_with_limit};
use crate::metrics::{HttpDirection, Metrics, SniGroup, SniKind};
use crate::metrics::{HttpDirection, Metrics};
use crate::proxy::{NeonOptions, run_until_cancelled};
use crate::serverless::backend::HttpConnError;
use crate::types::{DbName, RoleName};
@@ -227,32 +227,6 @@ fn get_conn_info(
}
}
// check the URL that was used, for metrics
{
let host_endpoint = headers
// get the host header
.get("host")
// extract the domain
.and_then(|h| {
let (host, _port) = h.to_str().ok()?.split_once(':')?;
Some(host)
})
// get the endpoint prefix
.map(|h| h.split_once('.').map_or(h, |(prefix, _)| prefix));
let kind = if host_endpoint == Some(&*endpoint) {
SniKind::Sni
} else {
SniKind::NoSni
};
let protocol = ctx.protocol();
Metrics::get()
.proxy
.accepted_connections_by_sni
.inc(SniGroup { protocol, kind });
}
ctx.set_user_agent(
headers
.get(hyper::header::USER_AGENT)

View File

@@ -121,20 +121,6 @@ impl Client {
resp.json().await.map_err(Error::ReceiveBody)
}
pub async fn switch_timeline_membership(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
req: &models::TimelineMembershipSwitchRequest,
) -> Result<models::TimelineMembershipSwitchResponse> {
let uri = format!(
"{}/v1/tenant/{}/timeline/{}/membership",
self.mgmt_api_endpoint, tenant_id, timeline_id
);
let resp = self.put(&uri, req).await?;
resp.json().await.map_err(Error::ReceiveBody)
}
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TenantDeleteResult> {
let uri = format!("{}/v1/tenant/{}", self.mgmt_api_endpoint, tenant_id);
let resp = self

View File

@@ -243,7 +243,8 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
let resp =
pull_timeline::handle_request(data, conf.sk_auth_token.clone(), ca_certs, global_timelines)
.await?;
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, resp)
}

View File

@@ -7,7 +7,6 @@ use bytes::Bytes;
use camino::Utf8PathBuf;
use chrono::{DateTime, Utc};
use futures::{SinkExt, StreamExt, TryStreamExt};
use http_utils::error::ApiError;
use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo};
use reqwest::Certificate;
use safekeeper_api::Term;
@@ -31,7 +30,7 @@ use utils::pausable_failpoint;
use crate::control_file::CONTROL_FILE_NAME;
use crate::state::{EvictionState, TimelinePersistentState};
use crate::timeline::{Timeline, TimelineError, WalResidentTimeline};
use crate::timeline::{Timeline, WalResidentTimeline};
use crate::timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline};
use crate::wal_storage::open_wal_file;
use crate::{GlobalTimelines, debug_dump, wal_backup};
@@ -396,7 +395,7 @@ pub async fn handle_request(
sk_auth_token: Option<SecretString>,
ssl_ca_certs: Vec<Certificate>,
global_timelines: Arc<GlobalTimelines>,
) -> Result<PullTimelineResponse, ApiError> {
) -> Result<PullTimelineResponse> {
let existing_tli = global_timelines.get(TenantTimelineId::new(
request.tenant_id,
request.timeline_id,
@@ -412,9 +411,7 @@ pub async fn handle_request(
for ssl_ca_cert in ssl_ca_certs {
http_client = http_client.add_root_certificate(ssl_ca_cert);
}
let http_client = http_client
.build()
.map_err(|e| ApiError::InternalServerError(e.into()))?;
let http_client = http_client.build()?;
let http_hosts = request.http_hosts.clone();
@@ -446,10 +443,10 @@ pub async fn handle_request(
// offline and C comes online. Then we want a pull on C with A and B as hosts to work.
let min_required_successful = (http_hosts.len() - 1).max(1);
if statuses.len() < min_required_successful {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
bail!(
"only got {} successful status responses. required: {min_required_successful}",
statuses.len()
)));
)
}
// Find the most advanced safekeeper
@@ -468,7 +465,7 @@ pub async fn handle_request(
assert!(status.tenant_id == request.tenant_id);
assert!(status.timeline_id == request.timeline_id);
match pull_timeline(
pull_timeline(
status,
safekeeper_host,
sk_auth_token,
@@ -476,21 +473,6 @@ pub async fn handle_request(
global_timelines,
)
.await
{
Ok(resp) => Ok(resp),
Err(e) => {
match e.downcast_ref::<TimelineError>() {
Some(TimelineError::AlreadyExists(_)) => Ok(PullTimelineResponse {
safekeeper_host: None,
}),
Some(TimelineError::CreationInProgress(_)) => {
// We don't return success here because creation might still fail.
Err(ApiError::Conflict("Creation in progress".to_owned()))
}
_ => Err(ApiError::InternalServerError(e)),
}
}
}
}
async fn pull_timeline(

View File

@@ -98,23 +98,6 @@ impl SafekeeperClient {
)
}
#[allow(unused)]
pub(crate) async fn switch_timeline_membership(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
req: &models::TimelineMembershipSwitchRequest,
) -> Result<models::TimelineMembershipSwitchResponse> {
measured_request!(
"switch_timeline_membership",
crate::metrics::Method::Put,
&self.node_id_label,
self.inner
.switch_timeline_membership(tenant_id, timeline_id, req)
.await
)
}
pub(crate) async fn delete_tenant(
&self,
tenant_id: TenantId,

View File

@@ -8485,7 +8485,7 @@ impl Service {
// By default, live migrations are generous about the wait time for getting
// the secondary location up to speed. When draining, give up earlier in order
// to not stall the operation when a cold secondary is encountered.
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(30);
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(20);
const SECONDARY_DOWNLOAD_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
let reconciler_config = ReconcilerConfigBuilder::new(ReconcilerPriority::Normal)
.secondary_warmup_timeout(SECONDARY_WARMUP_TIMEOUT)
@@ -8818,7 +8818,7 @@ impl Service {
node_id: NodeId,
cancel: CancellationToken,
) -> Result<(), OperationError> {
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(30);
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(20);
const SECONDARY_DOWNLOAD_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
let reconciler_config = ReconcilerConfigBuilder::new(ReconcilerPriority::Normal)
.secondary_warmup_timeout(SECONDARY_WARMUP_TIMEOUT)

View File

@@ -165,17 +165,16 @@ pub(crate) async fn branch_cleanup_and_check_errors(
.head_object(&path, &CancellationToken::new())
.await;
if let Err(e) = response {
if response.is_err() {
// Object is not present.
let is_l0 = LayerMap::is_l0(layer.key_range(), layer.is_delta());
let msg = format!(
"index_part.json contains a layer {}{} (shard {}) that is not present in remote storage (layer_is_l0: {}) with error: {}",
"index_part.json contains a layer {}{} (shard {}) that is not present in remote storage (layer_is_l0: {})",
layer,
metadata.generation.get_suffix(),
metadata.shard,
is_l0,
e,
);
if is_l0 || ignore_error {

View File

@@ -137,10 +137,11 @@ struct TenantRefAccumulator {
impl TenantRefAccumulator {
fn update(&mut self, ttid: TenantShardTimelineId, index_part: &IndexPart) {
let this_shard_idx = ttid.tenant_shard_id.to_index();
self.shards_seen
(*self
.shards_seen
.entry(ttid.tenant_shard_id.tenant_id)
.or_default()
.insert(this_shard_idx);
.or_default())
.insert(this_shard_idx);
let mut ancestor_refs = Vec::new();
for (layer_name, layer_metadata) in &index_part.layer_metadata {
@@ -766,13 +767,10 @@ pub async fn pageserver_physical_gc(
stream_tenant_timelines(remote_client_ref, target_ref, tenant_shard_id).await?,
);
Ok(try_stream! {
let mut cnt = 0;
while let Some(ttid_res) = timelines.next().await {
let ttid = ttid_res?;
cnt += 1;
yield (ttid, tenant_manifest_arc.clone());
}
tracing::info!(%tenant_shard_id, "Found {} timelines", cnt);
})
}
});
@@ -792,7 +790,6 @@ pub async fn pageserver_physical_gc(
&accumulator,
tenant_manifest_arc,
)
.instrument(info_span!("gc_timeline", %ttid))
});
let timelines = timelines.try_buffered(CONCURRENCY);
let mut timelines = std::pin::pin!(timelines);

View File

@@ -153,10 +153,7 @@ pub async fn scan_pageserver_metadata(
const CONCURRENCY: usize = 32;
// Generate a stream of TenantTimelineId
let timelines = tenants.map_ok(|t| {
tracing::info!("Found tenant: {}", t);
stream_tenant_timelines(&remote_client, &target, t)
});
let timelines = tenants.map_ok(|t| stream_tenant_timelines(&remote_client, &target, t));
let timelines = timelines.try_buffered(CONCURRENCY);
let timelines = timelines.try_flatten();

View File

@@ -24,6 +24,7 @@ pub struct SnapshotDownloader {
remote_client: GenericRemoteStorage,
#[allow(dead_code)]
target: RootTarget,
bucket_config: BucketConfig,
tenant_id: TenantId,
output_path: Utf8PathBuf,
concurrency: usize,
@@ -42,6 +43,7 @@ impl SnapshotDownloader {
Ok(Self {
remote_client,
target,
bucket_config,
tenant_id,
output_path,
concurrency,
@@ -216,9 +218,11 @@ impl SnapshotDownloader {
}
pub async fn download(&self) -> anyhow::Result<()> {
let (remote_client, target) =
init_remote(self.bucket_config.clone(), NodeKind::Pageserver).await?;
// Generate a stream of TenantShardId
let shards =
stream_tenant_shards(&self.remote_client, &self.target, self.tenant_id).await?;
let shards = stream_tenant_shards(&remote_client, &target, self.tenant_id).await?;
let shards: Vec<TenantShardId> = shards.try_collect().await?;
// Only read from shards that have the highest count: avoids redundantly downloading
@@ -236,8 +240,7 @@ impl SnapshotDownloader {
for shard in shards.into_iter().filter(|s| s.shard_count == shard_count) {
// Generate a stream of TenantTimelineId
let timelines =
stream_tenant_timelines(&self.remote_client, &self.target, shard).await?;
let timelines = stream_tenant_timelines(&remote_client, &target, shard).await?;
// Generate a stream of S3TimelineBlobData
async fn load_timeline_index(
@@ -248,8 +251,8 @@ impl SnapshotDownloader {
let data = list_timeline_blobs(remote_client, ttid, target).await?;
Ok((ttid, data))
}
let timelines = timelines
.map_ok(|ttid| load_timeline_index(&self.remote_client, &self.target, ttid));
let timelines =
timelines.map_ok(|ttid| load_timeline_index(&remote_client, &target, ttid));
let mut timelines = std::pin::pin!(timelines.try_buffered(8));
while let Some(i) = timelines.next().await {

View File

@@ -1299,6 +1299,13 @@ class NeonEnv:
for key, value in override.items():
ps_cfg[key] = value
if self.pageserver_virtual_file_io_mode is not None:
# TODO(christian): https://github.com/neondatabase/neon/issues/11598
if not config.test_may_use_compatibility_snapshot_binaries:
ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode
else:
log.info("ignoring virtual_file_io_mode parametrization for compatibility test")
if self.pageserver_wal_receiver_protocol is not None:
key, value = PageserverWalReceiverProtocol.to_config_key_value(
self.pageserver_wal_receiver_protocol
@@ -1402,6 +1409,30 @@ class NeonEnv:
for f in futs:
f.result()
# Last step: register safekeepers at the storage controller
if (
self.storage_controller_config is not None
and self.storage_controller_config.get("timelines_onto_safekeepers") is True
):
for sk_id, sk in enumerate(self.safekeepers):
# 0 is an invalid safekeeper id
sk_id = sk_id + 1
body = {
"id": sk_id,
"created_at": "2023-10-25T09:11:25Z",
"updated_at": "2024-08-28T11:32:43Z",
"region_id": "aws-us-east-2",
"host": "127.0.0.1",
"port": sk.port.pg,
"http_port": sk.port.http,
"https_port": None,
"version": 5957,
"availability_zone_id": f"us-east-2b-{sk_id}",
}
self.storage_controller.on_safekeeper_deploy(sk_id, body)
self.storage_controller.safekeeper_scheduling_policy(sk_id, "Active")
self.endpoint_storage.start(timeout_in_seconds=timeout_in_seconds)
def stop(self, immediate=False, ps_assert_metric_no_errors=False, fail_on_endpoint_errors=True):
@@ -3835,7 +3866,7 @@ class NeonAuthBroker:
external_http_port: int,
auth_backend: NeonAuthBroker.ProxyV1,
):
self.domain = "local.neon.build" # resolves to 127.0.0.1
self.domain = "apiauth.local.neon.build" # resolves to 127.0.0.1
self.host = "127.0.0.1"
self.http_port = http_port
self.external_http_port = external_http_port
@@ -3852,7 +3883,7 @@ class NeonAuthBroker:
# generate key of it doesn't exist
crt_path = self.test_output_dir / "proxy.crt"
key_path = self.test_output_dir / "proxy.key"
generate_proxy_tls_certs(f"apiauth.{self.domain}", key_path, crt_path)
generate_proxy_tls_certs("apiauth.local.neon.build", key_path, crt_path)
args = [
str(self.neon_binpath / "proxy"),
@@ -3896,10 +3927,10 @@ class NeonAuthBroker:
log.info(f"Executing http query: {query}")
connstr = f"postgresql://{user}@ep-foo-bar-1234.{self.domain}/postgres"
connstr = f"postgresql://{user}@{self.domain}/postgres"
async with httpx.AsyncClient(verify=str(self.test_output_dir / "proxy.crt")) as client:
response = await client.post(
f"https://apiauth.{self.domain}:{self.external_http_port}/sql",
f"https://{self.domain}:{self.external_http_port}/sql",
json={"query": query, "params": args},
headers={
"Neon-Connection-String": connstr,
@@ -5446,13 +5477,6 @@ def wait_for_last_flush_lsn(
if last_flush_lsn is None:
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
# The last_flush_lsn may not correspond to a record boundary.
# For example, if the compute flushed WAL on a page boundary,
# the remaining part of the record might not be flushed for a long time.
# This would prevent the pageserver from reaching last_flush_lsn promptly.
# To ensure the rest of the record reaches the pageserver quickly,
# we forcibly flush the WAL by using CHECKPOINT.
endpoint.safe_psql("CHECKPOINT")
results = []
for tenant_shard_id, pageserver in shards:

View File

@@ -122,10 +122,6 @@ DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS = [
".*Call to node.*management API.*failed.*Timeout.*",
".*Failed to update node .+ after heartbeat round.*error sending request for url.*",
".*background_reconcile: failed to fetch top tenants:.*client error \\(Connect\\).*",
# Many tests will take safekeepers offline
".*Call to safekeeper.*management API.*failed.*receive body.*",
".*Call to safekeeper.*management API.*failed.*ReceiveBody.*",
".*Call to safekeeper.*management API.*failed.*Timeout.*",
# Many tests will start up with a node offline
".*startup_reconcile: Could not scan node.*",
# Tests run in dev mode

View File

@@ -544,69 +544,3 @@ def test_drop_role_with_table_privileges_from_non_neon_superuser(neon_simple_env
)
role = cursor.fetchone()
assert role is None
def test_db_with_custom_settings(neon_simple_env: NeonEnv):
"""
Test that compute_ctl can work with databases that have some custom settings.
For example, role=some_other_role, default_transaction_read_only=on,
search_path=non_public_schema, statement_timeout=1 (1ms).
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
TEST_ROLE = "some_other_role"
TEST_DB = "db_with_custom_settings"
TEST_SCHEMA = "non_public_schema"
endpoint.respec_deep(
**{
"spec": {
"skip_pg_catalog_updates": False,
"cluster": {
"databases": [
{
"name": TEST_DB,
"owner": TEST_ROLE,
}
],
"roles": [
{
"name": TEST_ROLE,
}
],
},
}
}
)
endpoint.reconfigure()
with endpoint.cursor(dbname=TEST_DB) as cursor:
cursor.execute(f"CREATE SCHEMA {TEST_SCHEMA}")
cursor.execute(f"ALTER DATABASE {TEST_DB} SET role = {TEST_ROLE}")
cursor.execute(f"ALTER DATABASE {TEST_DB} SET default_transaction_read_only = on")
cursor.execute(f"ALTER DATABASE {TEST_DB} SET search_path = {TEST_SCHEMA}")
cursor.execute(f"ALTER DATABASE {TEST_DB} SET statement_timeout = 1")
with endpoint.cursor(dbname=TEST_DB) as cursor:
cursor.execute("SELECT current_role")
role = cursor.fetchone()
assert role is not None
assert role[0] == TEST_ROLE
cursor.execute("SHOW default_transaction_read_only")
default_transaction_read_only = cursor.fetchone()
assert default_transaction_read_only is not None
assert default_transaction_read_only[0] == "on"
cursor.execute("SHOW search_path")
search_path = cursor.fetchone()
assert search_path is not None
assert search_path[0] == TEST_SCHEMA
# Do not check statement_timeout, because we force it to 2min
# in `endpoint.cursor()` fixture.
endpoint.reconfigure()

View File

@@ -641,55 +641,6 @@ def test_fast_import_binary(
assert res[0][0] == 10
def test_fast_import_event_triggers(
test_output_dir,
vanilla_pg: VanillaPostgres,
port_distributor: PortDistributor,
fast_import: FastImport,
):
vanilla_pg.start()
vanilla_pg.safe_psql("""
CREATE FUNCTION test_event_trigger_for_drops()
RETURNS event_trigger LANGUAGE plpgsql AS $$
DECLARE
obj record;
BEGIN
FOR obj IN SELECT * FROM pg_event_trigger_dropped_objects()
LOOP
RAISE NOTICE '% dropped object: % %.% %',
tg_tag,
obj.object_type,
obj.schema_name,
obj.object_name,
obj.object_identity;
END LOOP;
END
$$;
CREATE EVENT TRIGGER test_event_trigger_for_drops
ON sql_drop
EXECUTE PROCEDURE test_event_trigger_for_drops();
""")
pg_port = port_distributor.get_port()
p = fast_import.run_pgdata(pg_port=pg_port, source_connection_string=vanilla_pg.connstr())
assert p.returncode == 0
vanilla_pg.stop()
pgbin = PgBin(test_output_dir, fast_import.pg_distrib_dir, fast_import.pg_version)
with VanillaPostgres(
fast_import.workdir / "pgdata", pgbin, pg_port, False
) as new_pgdata_vanilla_pg:
new_pgdata_vanilla_pg.start()
# database name and user are hardcoded in fast_import binary, and they are different from normal vanilla postgres
conn = PgProtocol(dsn=f"postgresql://cloud_admin@localhost:{pg_port}/neondb")
res = conn.safe_psql("SELECT count(*) FROM pg_event_trigger;")
log.info(f"Result: {res}")
assert res[0][0] == 0, f"Neon does not support importing event triggers, got: {res[0][0]}"
def test_fast_import_restore_to_connstring(
test_output_dir,
vanilla_pg: VanillaPostgres,

View File

@@ -1822,7 +1822,7 @@ def test_timeline_detach_with_aux_files_with_detach_v1(
endpoint2.safe_psql(
"SELECT pg_create_logical_replication_slot('test_slot_restore', 'pgoutput')"
)
lsn3 = wait_for_last_flush_lsn(env, endpoint2, env.initial_tenant, branch_timeline_id)
lsn3 = wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, branch_timeline_id)
assert set(http.list_aux_files(env.initial_tenant, branch_timeline_id, lsn1).keys()) == set([])
assert set(http.list_aux_files(env.initial_tenant, branch_timeline_id, lsn3).keys()) == set(
["pg_replslot/test_slot_restore/state"]
@@ -1839,7 +1839,7 @@ def test_timeline_detach_with_aux_files_with_detach_v1(
assert all_reparented == set([])
# We need to ensure all safekeeper data are ingested before checking aux files: the API does not wait for LSN.
wait_for_last_flush_lsn(env, endpoint2, env.initial_tenant, branch_timeline_id)
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, branch_timeline_id)
assert set(http.list_aux_files(env.initial_tenant, env.initial_timeline, lsn2).keys()) == set(
["pg_replslot/test_slot_parent_1/state", "pg_replslot/test_slot_parent_2/state"]
), "main branch unaffected"

View File

@@ -1,7 +1,7 @@
{
"v17": [
"17.4",
"0d59c91c1a23e667f1d1169d5f040b3fa0a0ab44"
"eab3a37834cac6ec0719bf817ac918a201712d66"
],
"v16": [
"16.8",
@@ -9,10 +9,10 @@
],
"v15": [
"15.12",
"72f83df76c61ce18d81bd371f0afd2a43d59c052"
"b838c8969b7c63f3e637a769656f5f36793b797c"
],
"v14": [
"14.17",
"06b405bc982fd53522689aa4acbfd9c44b7993cf"
"c8dab02bfc003ae7bd59096919042d7840f3c194"
]
}