Compare commits

..

1 Commits

Author SHA1 Message Date
John Spray
7f3670a589 pageserver: don't log deletion S3 op failures as errors 2023-10-11 14:26:01 +01:00
16 changed files with 275 additions and 447 deletions

View File

@@ -224,8 +224,8 @@ RUN wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.7.tar.gz -
FROM build-deps AS vector-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.5.1.tar.gz -O pgvector.tar.gz && \
echo "cc7a8e034a96e30a819911ac79d32f6bc47bdd1aa2de4d7d4904e26b83209dc8 pgvector.tar.gz" | sha256sum --check && \
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.5.0.tar.gz -O pgvector.tar.gz && \
echo "d8aa3504b215467ca528525a6de12c3f85f9891b091ce0e5864dd8a9b757f77b pgvector.tar.gz" | sha256sum --check && \
mkdir pgvector-src && cd pgvector-src && tar xvzf ../pgvector.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \

View File

@@ -4,7 +4,7 @@
//! allowing multiple api users to independently work with the same S3 bucket, if
//! their bucket prefixes are both specified and different.
use std::{borrow::Cow, sync::Arc};
use std::sync::Arc;
use anyhow::Context;
use aws_config::{
@@ -556,20 +556,6 @@ impl RemoteStorage for S3Bucket {
.deleted_objects_total
.inc_by(chunk.len() as u64);
if let Some(errors) = resp.errors {
// Log a bounded number of the errors within the response:
// these requests can carry 1000 keys so logging each one
// would be too verbose, especially as errors may lead us
// to retry repeatedly.
const LOG_UP_TO_N_ERRORS: usize = 10;
for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
tracing::warn!(
"DeleteObjects key {} failed: {}: {}",
e.key.as_ref().map(Cow::from).unwrap_or("".into()),
e.code.as_ref().map(Cow::from).unwrap_or("".into()),
e.message.as_ref().map(Cow::from).unwrap_or("".into())
);
}
return Err(anyhow::format_err!(
"Failed to delete {} objects",
errors.len()

View File

@@ -12,8 +12,6 @@ use remote_storage::MAX_KEYS_PER_DELETE;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::warn;
use utils::backoff;
use crate::metrics;
@@ -64,19 +62,7 @@ impl Deleter {
Err(anyhow::anyhow!("failpoint hit"))
});
// A backoff::retry is used here for two reasons:
// - To provide a backoff rather than busy-polling the API on errors
// - To absorb transient 429/503 conditions without hitting our error
// logging path for issues deleting objects.
backoff::retry(
|| async { self.remote_storage.delete_objects(&self.accumulator).await },
|_| false,
3,
10,
"executing deletion batch",
backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Shutting down")),
)
.await
self.remote_storage.delete_objects(&self.accumulator).await
}
/// Block until everything in accumulator has been executed
@@ -101,10 +87,10 @@ impl Deleter {
self.accumulator.clear();
}
Err(e) => {
if self.cancel.is_cancelled() {
return Err(DeletionQueueError::ShuttingDown);
}
warn!("DeleteObjects request failed: {e:#}, will continue trying");
// The RemoteStorage interface doesn't discriminate between
// real errors and 503/429 responses, so we log at INFO level
// to avoid propagating spurious error-severity logs.
info!("DeleteObjects request failed: {e:#}, will retry");
metrics::DELETION_QUEUE
.remote_errors
.with_label_values(&["execute"])

View File

@@ -77,7 +77,7 @@ impl State {
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
) -> anyhow::Result<Self> {
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"]
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"]
.iter()
.map(|v| v.parse().unwrap())
.collect::<Vec<_>>();
@@ -164,6 +164,9 @@ impl From<TenantStateError> for ApiError {
fn from(tse: TenantStateError) -> ApiError {
match tse {
TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
TenantStateError::NotActive(_) => {
ApiError::ResourceUnavailable("Tenant not yet active".into())
}
TenantStateError::IsStopping(_) => {
ApiError::ResourceUnavailable("Tenant is stopping".into())
}

View File

@@ -29,11 +29,11 @@ use std::cmp::min;
use std::collections::hash_map::Entry;
use std::collections::BTreeSet;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Debug;
use std::fmt::Display;
use std::fs;
use std::fs::File;
use std::io;
use std::ops::Bound::Included;
use std::process::Command;
use std::process::Stdio;
@@ -49,6 +49,7 @@ use self::config::AttachmentMode;
use self::config::LocationConf;
use self::config::TenantConf;
use self::delete::DeleteTenantFlow;
use self::metadata::LoadMetadataError;
use self::metadata::TimelineMetadata;
use self::mgr::TenantsMap;
use self::remote_timeline_client::RemoteTimelineClient;
@@ -208,7 +209,7 @@ pub struct Tenant {
/// The remote storage generation, used to protect S3 objects from split-brain.
/// Does not change over the lifetime of the [`Tenant`] object.
///
///
/// This duplicates the generation stored in LocationConf, but that structure is mutable:
/// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
generation: Generation,
@@ -372,13 +373,6 @@ struct RemoteStartupData {
remote_metadata: TimelineMetadata,
}
struct TimelinePreload {
timeline_id: TimelineId,
remote_client: Option<RemoteTimelineClient>,
index_part: Option<IndexPart>,
metadata: TimelineMetadata,
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum WaitToBecomeActiveError {
WillNotBecomeActive {
@@ -419,6 +413,11 @@ pub enum CreateTimelineError {
Other(#[from] anyhow::Error),
}
struct TenantDirectoryScan {
sorted_timelines_to_load: Vec<(TimelineId, TimelineMetadata)>,
timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)>,
}
enum CreateTimelineCause {
Load,
Delete,
@@ -662,14 +661,41 @@ impl Tenant {
Ok(tenant)
}
fn download_indices(
&self,
timeline_ids: HashSet<TimelineId>,
remote_storage: &GenericRemoteStorage,
) -> JoinSet<Result<(TimelineId, RemoteTimelineClient, MaybeDeletedIndexPart), anyhow::Error>>
{
///
/// Background task that downloads all data for a tenant and brings it to Active state.
///
/// No background tasks are started as part of this routine.
///
async fn attach(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
span::debug_assert_current_span_has_tenant_id();
let marker_file = self.conf.tenant_attaching_mark_file_path(&self.tenant_id);
if !tokio::fs::try_exists(&marker_file)
.await
.context("check for existence of marker file")?
{
anyhow::bail!(
"implementation error: marker file should exist at beginning of this function"
);
}
// Get list of remote timelines
// download index files for every tenant timeline
info!("listing remote timelines");
let remote_storage = self
.remote_storage
.as_ref()
.ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?;
let remote_timeline_ids =
remote_timeline_client::list_remote_timelines(remote_storage, self.tenant_id).await?;
info!("found {} timelines", remote_timeline_ids.len());
// Download & parse index parts
let mut part_downloads = JoinSet::new();
for timeline_id in timeline_ids {
for timeline_id in remote_timeline_ids {
let client = RemoteTimelineClient::new(
remote_storage.clone(),
self.deletion_queue_client.clone(),
@@ -698,56 +724,11 @@ impl Tenant {
);
}
part_downloads
}
/// Special variant of preload_timelines that does not rely on remote storage
async fn preload_timelines_local(
self: &Arc<Self>,
timeline_ids: &HashSet<TimelineId>,
) -> anyhow::Result<Vec<TimelinePreload>> {
let mut preload_map = HashMap::new();
for timeline_id in timeline_ids {
let metadata = load_metadata(self.conf, &self.tenant_id, timeline_id)?;
preload_map.insert(
*timeline_id,
TimelinePreload {
timeline_id: *timeline_id,
remote_client: None,
// TODO: synthesize an index_part and make it non-optional
index_part: None,
metadata,
},
);
}
// Sort by ancestry
Ok(
tree_sort_timelines(preload_map, |p| p.metadata.ancestor_timeline())?
.into_iter()
.map(|i| i.1)
.collect(),
)
}
/// Do the remote I/O and sorting required to prepare a list of timelines
/// with their IndexParts, ready for hydrating into `Timeline`
async fn preload_timelines(
self: &Arc<Self>,
timeline_ids: HashSet<TimelineId>,
remote_storage: &GenericRemoteStorage,
) -> anyhow::Result<Vec<TimelinePreload>> {
span::debug_assert_current_span_has_tenant_id();
let mut part_downloads = self.download_indices(timeline_ids, remote_storage);
let mut timelines_to_resume_deletions = vec![];
// We construct a map all timeline's preload state, prior to sorting
// it by ancestry at the end of the function
let mut preload_map: HashMap<TimelineId, TimelinePreload> = HashMap::new();
// Wait for all the download tasks to complete & collect results.
let mut remote_index_and_client = HashMap::new();
let mut timeline_ancestors = HashMap::new();
while let Some(result) = part_downloads.join_next().await {
// NB: we already added timeline_id as context to the error
let result: Result<_, anyhow::Error> = result.context("joinset task join")?;
@@ -755,16 +736,8 @@ impl Tenant {
debug!("successfully downloaded index part for timeline {timeline_id}");
match index_part {
MaybeDeletedIndexPart::IndexPart(index_part) => {
let metadata = index_part.metadata.clone();
preload_map.insert(
timeline_id,
TimelinePreload {
timeline_id,
remote_client: Some(client),
index_part: Some(index_part),
metadata,
},
);
timeline_ancestors.insert(timeline_id, index_part.metadata.clone());
remote_index_and_client.insert(timeline_id, (index_part, client));
}
MaybeDeletedIndexPart::Deleted(index_part) => {
info!(
@@ -776,6 +749,35 @@ impl Tenant {
}
}
// For every timeline, download the metadata file, scan the local directory,
// and build a layer map that contains an entry for each remote and local
// layer file.
let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?;
for (timeline_id, remote_metadata) in sorted_timelines {
let (index_part, remote_client) = remote_index_and_client
.remove(&timeline_id)
.expect("just put it in above");
// TODO again handle early failure
self.load_remote_timeline(
timeline_id,
index_part,
remote_metadata,
TimelineResources {
remote_client: Some(remote_client),
deletion_queue_client: self.deletion_queue_client.clone(),
},
ctx,
)
.await
.with_context(|| {
format!(
"failed to load remote timeline {} for tenant {}",
timeline_id, self.tenant_id
)
})?;
}
// Walk through deleted timelines, resume deletion
for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
remote_timeline_client
@@ -796,81 +798,6 @@ impl Tenant {
.map_err(LoadLocalTimelineError::ResumeDeletion)?;
}
// Sort by ancestry
Ok(
tree_sort_timelines(preload_map, |p| p.metadata.ancestor_timeline())?
.into_iter()
.map(|i| i.1)
.collect(),
)
}
///
/// Background task that downloads all data for a tenant and brings it to Active state.
///
/// No background tasks are started as part of this routine.
///
async fn attach(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
span::debug_assert_current_span_has_tenant_id();
let marker_file = self.conf.tenant_attaching_mark_file_path(&self.tenant_id);
if !tokio::fs::try_exists(&marker_file)
.await
.context("check for existence of marker file")?
{
anyhow::bail!(
"implementation error: marker file should exist at beginning of this function"
);
}
// Get list of remote timelines
info!("listing remote timelines");
let remote_storage = self
.remote_storage
.as_ref()
.ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?;
let remote_timeline_ids =
remote_timeline_client::list_remote_timelines(remote_storage, self.tenant_id).await?;
info!("found {} timelines", remote_timeline_ids.len());
// Download & parse index parts
let sorted_timelines = self
.preload_timelines(remote_timeline_ids, remote_storage)
.await?;
// For every timeline, download the metadata file, scan the local directory,
// and build a layer map that contains an entry for each remote and local
// layer file.
for timeline_preload in sorted_timelines {
let TimelinePreload {
timeline_id,
remote_client,
index_part,
metadata: _,
} = timeline_preload;
// TODO again handle early failure
self.load_remote_timeline(
timeline_id,
index_part.unwrap(),
TimelineResources {
remote_client,
deletion_queue_client: self.deletion_queue_client.clone(),
},
ctx,
)
.await
.with_context(|| {
format!(
"failed to load remote timeline {} for tenant {}",
timeline_id, self.tenant_id
)
})?;
}
std::fs::remove_file(&marker_file)
.with_context(|| format!("unlink attach marker file {marker_file}"))?;
crashsafe::fsync(marker_file.parent().expect("marker file has parent dir"))
@@ -903,6 +830,7 @@ impl Tenant {
&self,
timeline_id: TimelineId,
index_part: IndexPart,
remote_metadata: TimelineMetadata,
resources: TimelineResources,
ctx: &RequestContext,
) -> anyhow::Result<()> {
@@ -913,7 +841,7 @@ impl Tenant {
.await
.context("Failed to create new timeline directory")?;
let ancestor = if let Some(ancestor_id) = index_part.metadata.ancestor_timeline() {
let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() {
let timelines = self.timelines.lock().unwrap();
Some(Arc::clone(timelines.get(&ancestor_id).ok_or_else(
|| {
@@ -931,7 +859,6 @@ impl Tenant {
// cannot be older than the local one
let local_metadata = None;
let remote_metadata = index_part.metadata.clone();
self.timeline_init_and_sync(
timeline_id,
resources,
@@ -1105,9 +1032,12 @@ impl Tenant {
tenant
}
async fn scan_timelines_dir(self: &Arc<Tenant>) -> anyhow::Result<HashSet<TimelineId>> {
let mut timelines_to_load: HashSet<TimelineId> = HashSet::new();
let mut timelines_to_resume_deletion: HashSet<TimelineId> = HashSet::new();
fn scan_and_sort_timelines_dir(self: Arc<Tenant>) -> anyhow::Result<TenantDirectoryScan> {
let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
// Note timelines_to_resume_deletion needs to be separate because it can be not sortable
// from the point of `tree_sort_timelines`. I e some parents can be missing because deletion
// completed in non topological order (for example because parent has smaller number of layer files in it)
let mut timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)> = vec![];
let timelines_dir = self.conf.timelines_path(&self.tenant_id);
@@ -1156,7 +1086,38 @@ impl Tenant {
})?;
info!("Found deletion mark for timeline {}", timeline_id);
timelines_to_resume_deletion.insert(timeline_id);
match load_metadata(self.conf, &self.tenant_id, &timeline_id) {
Ok(metadata) => {
timelines_to_resume_deletion.push((timeline_id, Some(metadata)))
}
Err(e) => match &e {
LoadMetadataError::Read(r) => {
if r.kind() != io::ErrorKind::NotFound {
return Err(anyhow::anyhow!(e)).with_context(|| {
format!("Failed to load metadata for timeline_id {timeline_id}")
});
}
// If metadata doesnt exist it means that we've crashed without
// completing cleanup_remaining_timeline_fs_traces in DeleteTimelineFlow.
// So save timeline_id for later call to `DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces`.
// We cant do it here because the method is async so we'd need block_on
// and here we're in spawn_blocking. cleanup_remaining_timeline_fs_traces uses fs operations
// so that basically results in a cycle:
// spawn_blocking
// - block_on
// - spawn_blocking
// which can lead to running out of threads in blocing pool.
timelines_to_resume_deletion.push((timeline_id, None));
}
_ => {
return Err(anyhow::anyhow!(e)).with_context(|| {
format!("Failed to load metadata for timeline_id {timeline_id}")
})
}
},
}
} else {
if !timeline_dir.exists() {
warn!("Timeline dir entry become invalid: {timeline_dir}");
@@ -1194,7 +1155,9 @@ impl Tenant {
let file_name = entry.file_name();
if let Ok(timeline_id) = file_name.parse::<TimelineId>() {
timelines_to_load.insert(timeline_id);
let metadata = load_metadata(self.conf, &self.tenant_id, &timeline_id)
.context("failed to load metadata")?;
timelines_to_load.insert(timeline_id, metadata);
} else {
// A file or directory that doesn't look like a timeline ID
warn!("unexpected file or directory in timelines directory: {file_name}");
@@ -1202,18 +1165,14 @@ impl Tenant {
}
}
for timeline_id in timelines_to_resume_deletion {
if let Err(e) =
DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(self, timeline_id).await
{
warn!(
"cannot clean up deleted timeline dir timeline_id: {} error: {:#}",
timeline_id, e
);
// Sort the array of timeline IDs into tree-order, so that parent comes before
// all its children.
tree_sort_timelines(timelines_to_load, |m| m.ancestor_timeline()).map(|sorted_timelines| {
TenantDirectoryScan {
sorted_timelines_to_load: sorted_timelines,
timelines_to_resume_deletion,
}
}
Ok(timelines_to_load)
})
}
///
@@ -1236,34 +1195,24 @@ impl Tenant {
//
// Scan the directory, peek into the metadata file of each timeline, and
// collect a list of timelines and their ancestors.
let span = info_span!("blocking");
let cloned = Arc::clone(self);
let local_timelines = tokio::task::spawn(async move { cloned.scan_timelines_dir().await })
.await
.context("load spawn_blocking")
.and_then(|res| res)?;
let scan = tokio::task::spawn_blocking(move || {
let _g = span.entered();
cloned.scan_and_sort_timelines_dir()
})
.await
.context("load spawn_blocking")
.and_then(|res| res)?;
let sorted_timelines = match &self.remote_storage {
Some(remote_storage) => {
self.preload_timelines(local_timelines, remote_storage)
.await?
}
None => {
// Deprecated mode, only used in dev.
self.preload_timelines_local(&local_timelines).await?
}
};
for timeline_preload in sorted_timelines {
let TimelinePreload {
timeline_id,
remote_client: _,
index_part: _,
metadata,
} = timeline_preload;
// FIXME original collect_timeline_files contained one more check:
// 1. "Timeline has no ancestor and no layer files"
// Process loadable timelines first
for (timeline_id, local_metadata) in scan.sorted_timelines_to_load {
if let Err(e) = self
.load_local_timeline(timeline_id, metadata, init_order, ctx, false)
.load_local_timeline(timeline_id, local_metadata, init_order, ctx, false)
.await
{
match e {
@@ -1280,6 +1229,43 @@ impl Tenant {
}
}
// Resume deletion ones with deleted_mark
for (timeline_id, maybe_local_metadata) in scan.timelines_to_resume_deletion {
match maybe_local_metadata {
None => {
// See comment in `scan_and_sort_timelines_dir`.
if let Err(e) =
DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(self, timeline_id)
.await
{
warn!(
"cannot clean up deleted timeline dir timeline_id: {} error: {:#}",
timeline_id, e
);
}
}
Some(local_metadata) => {
if let Err(e) = self
.load_local_timeline(timeline_id, local_metadata, init_order, ctx, true)
.await
{
match e {
LoadLocalTimelineError::Load(source) => {
// We tried to load deleted timeline, this is a bug.
return Err(anyhow::anyhow!(source).context(
"This is a bug. We tried to load deleted timeline which is wrong and loading failed. Timeline: {timeline_id}"
));
}
LoadLocalTimelineError::ResumeDeletion(source) => {
// Make sure resumed deletion wont fail loading for entire tenant.
error!("Failed to resume timeline deletion: {source:#}")
}
}
}
}
}
}
trace!("Done");
Ok(())
@@ -2769,11 +2755,6 @@ impl Tenant {
) -> Result<Arc<Timeline>, CreateTimelineError> {
let src_id = src_timeline.timeline_id;
// First acquire the GC lock so that another task cannot advance the GC
// cutoff in 'gc_info', and make 'start_lsn' invalid, while we are
// creating the branch.
let _gc_cs = self.gc_cs.lock().await;
// If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN
let start_lsn = start_lsn.unwrap_or_else(|| {
let lsn = src_timeline.get_last_record_lsn();
@@ -2781,6 +2762,11 @@ impl Tenant {
lsn
});
// First acquire the GC lock so that another task cannot advance the GC
// cutoff in 'gc_info', and make 'start_lsn' invalid, while we are
// creating the branch.
let _gc_cs = self.gc_cs.lock().await;
// Create a placeholder for the new branch. This will error
// out if the new timeline ID is already in use.
let timeline_uninit_mark = {

View File

@@ -31,7 +31,7 @@ use super::{
const SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS: u32 = 3;
#[derive(Debug, thiserror::Error)]
pub(crate) enum DeleteTenantError {
pub enum DeleteTenantError {
#[error("GetTenant {0}")]
Get(#[from] GetTenantError),
@@ -376,7 +376,7 @@ impl DeleteTenantFlow {
Ok(())
}
pub(crate) async fn should_resume_deletion(
pub async fn should_resume_deletion(
conf: &'static PageServerConf,
remote_storage: Option<&GenericRemoteStorage>,
tenant: &Tenant,

View File

@@ -50,7 +50,7 @@ use super::TenantSharedResources;
/// its lifetime, and we can preserve some important safety invariants like `Tenant` always
/// having a properly acquired generation (Secondary doesn't need a generation)
#[derive(Clone)]
pub(crate) enum TenantSlot {
pub enum TenantSlot {
Attached(Arc<Tenant>),
Secondary,
}
@@ -481,7 +481,7 @@ pub(crate) fn schedule_local_tenant_processing(
/// management API. For example, it could attach the tenant on a different pageserver.
/// We would then be in split-brain once this pageserver restarts.
#[instrument(skip_all)]
pub(crate) async fn shutdown_all_tenants() {
pub async fn shutdown_all_tenants() {
shutdown_all_tenants0(&TENANTS).await
}
@@ -593,7 +593,7 @@ async fn shutdown_all_tenants0(tenants: &tokio::sync::RwLock<TenantsMap>) {
// caller will log how long we took
}
pub(crate) async fn create_tenant(
pub async fn create_tenant(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_id: TenantId,
@@ -628,14 +628,14 @@ pub(crate) async fn create_tenant(
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum SetNewTenantConfigError {
pub enum SetNewTenantConfigError {
#[error(transparent)]
GetTenant(#[from] GetTenantError),
#[error(transparent)]
Persist(anyhow::Error),
}
pub(crate) async fn set_new_tenant_config(
pub async fn set_new_tenant_config(
conf: &'static PageServerConf,
new_tenant_conf: TenantConfOpt,
tenant_id: TenantId,
@@ -776,7 +776,7 @@ pub(crate) async fn upsert_location(
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum GetTenantError {
pub enum GetTenantError {
#[error("Tenant {0} not found")]
NotFound(TenantId),
#[error("Tenant {0} is not active")]
@@ -792,7 +792,7 @@ pub(crate) enum GetTenantError {
/// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
///
/// This method is cancel-safe.
pub(crate) async fn get_tenant(
pub async fn get_tenant(
tenant_id: TenantId,
active_only: bool,
) -> Result<Arc<Tenant>, GetTenantError> {
@@ -817,7 +817,7 @@ pub(crate) async fn get_tenant(
}
}
pub(crate) async fn delete_tenant(
pub async fn delete_tenant(
conf: &'static PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
tenant_id: TenantId,
@@ -826,7 +826,7 @@ pub(crate) async fn delete_tenant(
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum DeleteTimelineError {
pub enum DeleteTimelineError {
#[error("Tenant {0}")]
Tenant(#[from] GetTenantError),
@@ -834,7 +834,7 @@ pub(crate) enum DeleteTimelineError {
Timeline(#[from] crate::tenant::DeleteTimelineError),
}
pub(crate) async fn delete_timeline(
pub async fn delete_timeline(
tenant_id: TenantId,
timeline_id: TimelineId,
_ctx: &RequestContext,
@@ -845,16 +845,18 @@ pub(crate) async fn delete_timeline(
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum TenantStateError {
pub enum TenantStateError {
#[error("Tenant {0} not found")]
NotFound(TenantId),
#[error("Tenant {0} is stopping")]
IsStopping(TenantId),
#[error("Tenant {0} is not active")]
NotActive(TenantId),
#[error(transparent)]
Other(#[from] anyhow::Error),
}
pub(crate) async fn detach_tenant(
pub async fn detach_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
detach_ignored: bool,
@@ -924,7 +926,7 @@ async fn detach_tenant0(
removal_result
}
pub(crate) async fn load_tenant(
pub async fn load_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
generation: Generation,
@@ -961,7 +963,7 @@ pub(crate) async fn load_tenant(
Ok(())
}
pub(crate) async fn ignore_tenant(
pub async fn ignore_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
) -> Result<(), TenantStateError> {
@@ -989,7 +991,7 @@ async fn ignore_tenant0(
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum TenantMapListError {
pub enum TenantMapListError {
#[error("tenant map is still initiailizing")]
Initializing,
}
@@ -997,7 +999,7 @@ pub(crate) enum TenantMapListError {
///
/// Get list of tenants, for the mgmt API
///
pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
let tenants = TENANTS.read().await;
let m = match &*tenants {
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
@@ -1015,7 +1017,7 @@ pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, Tenan
///
/// Downloading all the tenant data is performed in the background, this merely
/// spawns the background task and returns quickly.
pub(crate) async fn attach_tenant(
pub async fn attach_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
generation: Generation,
@@ -1052,7 +1054,7 @@ pub(crate) async fn attach_tenant(
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum TenantMapInsertError {
pub enum TenantMapInsertError {
#[error("tenant map is still initializing")]
StillInitializing,
#[error("tenant map is shutting down")]
@@ -1215,7 +1217,7 @@ use {
utils::http::error::ApiError,
};
pub(crate) async fn immediate_gc(
pub async fn immediate_gc(
tenant_id: TenantId,
timeline_id: TimelineId,
gc_req: TimelineGcRequest,

View File

@@ -1,6 +1,5 @@
use futures::future::Either;
use proxy::auth;
use proxy::config::HttpConfig;
use proxy::console;
use proxy::http;
use proxy::metrics;
@@ -80,9 +79,6 @@ struct ProxyCliArgs {
/// Allow self-signed certificates for compute nodes (for testing)
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
allow_self_signed_compute: bool,
/// timeout for http connections
#[clap(long, default_value = "15s", value_parser = humantime::parse_duration)]
sql_over_http_timeout: tokio::time::Duration,
}
#[tokio::main]
@@ -224,15 +220,12 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
auth::BackendType::Link(Cow::Owned(url))
}
};
let http_config = HttpConfig {
sql_over_http_timeout: args.sql_over_http_timeout,
};
let config = Box::leak(Box::new(ProxyConfig {
tls_config,
auth_backend,
metric_collection,
allow_self_signed_compute: args.allow_self_signed_compute,
http_config,
}));
Ok(config)

View File

@@ -13,7 +13,6 @@ pub struct ProxyConfig {
pub auth_backend: auth::BackendType<'static, ()>,
pub metric_collection: Option<MetricCollectionConfig>,
pub allow_self_signed_compute: bool,
pub http_config: HttpConfig,
}
#[derive(Debug)]
@@ -27,10 +26,6 @@ pub struct TlsConfig {
pub common_names: Option<HashSet<String>>,
}
pub struct HttpConfig {
pub sql_over_http_timeout: tokio::time::Duration,
}
impl TlsConfig {
pub fn to_server_config(&self) -> Arc<rustls::ServerConfig> {
self.config.clone()

View File

@@ -20,7 +20,6 @@ use tokio_postgres::AsyncMessage;
use crate::{
auth, console,
metrics::{Ids, MetricCounter, USAGE_METRICS},
proxy::{NUM_DB_CONNECTIONS_CLOSED_COUNTER, NUM_DB_CONNECTIONS_OPENED_COUNTER},
};
use crate::{compute, config};
@@ -419,42 +418,36 @@ async fn connect_to_compute_once(
};
tokio::spawn(
async move {
NUM_DB_CONNECTIONS_OPENED_COUNTER.with_label_values(&["http"]).inc();
scopeguard::defer! {
NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc();
poll_fn(move |cx| {
if matches!(rx.has_changed(), Ok(true)) {
session = *rx.borrow_and_update();
info!(%session, "changed session");
}
poll_fn(move |cx| {
if matches!(rx.has_changed(), Ok(true)) {
session = *rx.borrow_and_update();
info!(%session, "changed session");
}
loop {
let message = ready!(connection.poll_message(cx));
loop {
let message = ready!(connection.poll_message(cx));
match message {
Some(Ok(AsyncMessage::Notice(notice))) => {
info!(%session, "notice: {}", notice);
}
Some(Ok(AsyncMessage::Notification(notif))) => {
warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received");
}
Some(Ok(_)) => {
warn!(%session, "unknown message");
}
Some(Err(e)) => {
error!(%session, "connection error: {}", e);
return Poll::Ready(())
}
None => {
info!("connection closed");
return Poll::Ready(())
}
match message {
Some(Ok(AsyncMessage::Notice(notice))) => {
info!(%session, "notice: {}", notice);
}
Some(Ok(AsyncMessage::Notification(notif))) => {
warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received");
}
Some(Ok(_)) => {
warn!(%session, "unknown message");
}
Some(Err(e)) => {
error!(%session, "connection error: {}", e);
return Poll::Ready(())
}
None => {
info!("connection closed");
return Poll::Ready(())
}
}
}).await
}
}
})
.instrument(span)
);

View File

@@ -24,9 +24,6 @@ use url::Url;
use utils::http::error::ApiError;
use utils::http::json::json_response;
use crate::config::HttpConfig;
use crate::proxy::{NUM_CONNECTIONS_ACCEPTED_COUNTER, NUM_CONNECTIONS_CLOSED_COUNTER};
use super::conn_pool::ConnInfo;
use super::conn_pool::GlobalConnPool;
@@ -50,6 +47,7 @@ enum Payload {
const MAX_RESPONSE_SIZE: usize = 10 * 1024 * 1024; // 10 MiB
const MAX_REQUEST_SIZE: u64 = 10 * 1024 * 1024; // 10 MiB
const HTTP_CONNECTION_TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_secs(15);
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode");
@@ -191,10 +189,9 @@ pub async fn handle(
sni_hostname: Option<String>,
conn_pool: Arc<GlobalConnPool>,
session_id: uuid::Uuid,
config: &'static HttpConfig,
) -> Result<Response<Body>, ApiError> {
let result = tokio::time::timeout(
config.sql_over_http_timeout,
HTTP_CONNECTION_TIMEOUT,
handle_inner(request, sni_hostname, conn_pool, session_id),
)
.await;
@@ -225,7 +222,7 @@ pub async fn handle(
Err(_) => {
let message = format!(
"HTTP-Connection timed out, execution time exeeded {} seconds",
config.sql_over_http_timeout.as_secs()
HTTP_CONNECTION_TIMEOUT.as_secs()
);
error!(message);
json_response(
@@ -248,13 +245,6 @@ async fn handle_inner(
conn_pool: Arc<GlobalConnPool>,
session_id: uuid::Uuid,
) -> anyhow::Result<Response<Body>> {
NUM_CONNECTIONS_ACCEPTED_COUNTER
.with_label_values(&["http"])
.inc();
scopeguard::defer! {
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc();
}
//
// Determine the destination and connection params
//

View File

@@ -3,10 +3,7 @@ use crate::{
config::ProxyConfig,
error::io_error,
protocol2::{ProxyProtocolAccept, WithClientIp},
proxy::{
handle_client, ClientMode, NUM_CLIENT_CONNECTION_CLOSED_COUNTER,
NUM_CLIENT_CONNECTION_OPENED_COUNTER,
},
proxy::{handle_client, ClientMode},
};
use bytes::{Buf, Bytes};
use futures::{Sink, Stream, StreamExt};
@@ -205,14 +202,7 @@ async fn ws_handler(
// TODO: that deserves a refactor as now this function also handles http json client besides websockets.
// Right now I don't want to blow up sql-over-http patch with file renames and do that as a follow up instead.
} else if request.uri().path() == "/sql" && request.method() == Method::POST {
sql_over_http::handle(
request,
sni_hostname,
conn_pool,
session_id,
&config.http_config,
)
.await
sql_over_http::handle(request, sni_hostname, conn_pool, session_id).await
} else if request.uri().path() == "/sql" && request.method() == Method::OPTIONS {
Response::builder()
.header("Allow", "OPTIONS, POST")
@@ -285,25 +275,23 @@ pub async fn task_main(
let conn_pool = conn_pool.clone();
async move {
Ok::<_, Infallible>(MetricService::new(hyper::service::service_fn(
move |req: Request<Body>| {
let sni_name = sni_name.clone();
let conn_pool = conn_pool.clone();
Ok::<_, Infallible>(hyper::service::service_fn(move |req: Request<Body>| {
let sni_name = sni_name.clone();
let conn_pool = conn_pool.clone();
async move {
let cancel_map = Arc::new(CancelMap::default());
let session_id = uuid::Uuid::new_v4();
async move {
let cancel_map = Arc::new(CancelMap::default());
let session_id = uuid::Uuid::new_v4();
ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name)
.instrument(info_span!(
"ws-client",
session = %session_id,
%peer_addr,
))
.await
}
},
)))
ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name)
.instrument(info_span!(
"ws-client",
session = %session_id,
%peer_addr,
))
.await
}
}))
}
},
);
@@ -315,41 +303,3 @@ pub async fn task_main(
Ok(())
}
struct MetricService<S> {
inner: S,
}
impl<S> MetricService<S> {
fn new(inner: S) -> MetricService<S> {
NUM_CLIENT_CONNECTION_OPENED_COUNTER
.with_label_values(&["http"])
.inc();
MetricService { inner }
}
}
impl<S> Drop for MetricService<S> {
fn drop(&mut self) {
NUM_CLIENT_CONNECTION_CLOSED_COUNTER
.with_label_values(&["http"])
.inc();
}
}
impl<S, ReqBody> hyper::service::Service<Request<ReqBody>> for MetricService<S>
where
S: hyper::service::Service<Request<ReqBody>>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
self.inner.call(req)
}
}

View File

@@ -39,55 +39,19 @@ const RETRY_WAIT_EXPONENT_BASE: f64 = std::f64::consts::SQRT_2;
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
const ERR_PROTO_VIOLATION: &str = "protocol violation";
pub static NUM_DB_CONNECTIONS_OPENED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_opened_db_connections_total",
"Number of opened connections to a database.",
&["protocol"],
)
.unwrap()
});
pub static NUM_DB_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_closed_db_connections_total",
"Number of closed connections to a database.",
&["protocol"],
)
.unwrap()
});
pub static NUM_CLIENT_CONNECTION_OPENED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_opened_client_connections_total",
"Number of opened connections from a client.",
&["protocol"],
)
.unwrap()
});
pub static NUM_CLIENT_CONNECTION_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_closed_client_connections_total",
"Number of closed connections from a client.",
&["protocol"],
)
.unwrap()
});
pub static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_accepted_connections_total",
"Number of client connections accepted.",
"Number of TCP client connections accepted.",
&["protocol"],
)
.unwrap()
});
pub static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_closed_connections_total",
"Number of client connections closed.",
"Number of TCP client connections closed.",
&["protocol"],
)
.unwrap()
@@ -254,16 +218,12 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
"handling interactive connection from client"
);
let proto = mode.protocol_label();
NUM_CLIENT_CONNECTION_OPENED_COUNTER
.with_label_values(&[proto])
.inc();
// The `closed` counter will increase when this future is destroyed.
NUM_CONNECTIONS_ACCEPTED_COUNTER
.with_label_values(&[proto])
.with_label_values(&[mode.protocol_label()])
.inc();
scopeguard::defer! {
NUM_CLIENT_CONNECTION_CLOSED_COUNTER.with_label_values(&[proto]).inc();
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc();
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[mode.protocol_label()]).inc();
}
let tls = config.tls_config.as_ref();
@@ -298,7 +258,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
mode.allow_self_signed_compute(config),
);
cancel_map
.with_session(|session| client.connect_to_db(session, mode))
.with_session(|session| client.connect_to_db(session, mode.allow_cleartext()))
.await
}
@@ -774,7 +734,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
async fn connect_to_db(
self,
session: cancellation::Session<'_>,
mode: ClientMode,
allow_cleartext: bool,
) -> anyhow::Result<()> {
let Self {
mut stream,
@@ -790,7 +750,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
};
let auth_result = match creds
.authenticate(&extra, &mut stream, mode.allow_cleartext())
.authenticate(&extra, &mut stream, allow_cleartext)
.await
{
Ok(auth_result) => auth_result,
@@ -816,14 +776,6 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
.or_else(|e| stream.throw_error(e))
.await?;
let proto = mode.protocol_label();
NUM_DB_CONNECTIONS_OPENED_COUNTER
.with_label_values(&[proto])
.inc();
scopeguard::defer! {
NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc();
}
prepare_client_connection(&node, reported_auth_ok, session, &mut stream).await?;
// Before proxy passing, forward to compute whatever data is left in the
// PqStream input buffer. Normally there is none, but our serverless npm

View File

@@ -374,12 +374,8 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
if conf.http_auth.is_some() {
router = router.middleware(auth_middleware(|request| {
#[allow(clippy::mutable_key_type)]
static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> = Lazy::new(|| {
["/v1/status", "/metrics"]
.iter()
.map(|v| v.parse().unwrap())
.collect()
});
static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> =
Lazy::new(|| ["/v1/status"].iter().map(|v| v.parse().unwrap()).collect());
if ALLOWLIST_ROUTES.contains(request.uri()) {
None
} else {

View File

@@ -65,7 +65,7 @@ def start_heavy_write_workload(env: PgCompare, n_tables: int, scale: int, num_it
def start_single_table_workload(table_id: int):
for _ in range(num_iters):
with env.pg.connect(options="-cstatement_timeout=300s").cursor() as cur:
with env.pg.connect().cursor() as cur:
cur.execute(
f"INSERT INTO t{table_id} SELECT FROM generate_series(1,{new_rows_each_update})"
)

View File

@@ -6,7 +6,6 @@ from pathlib import Path
from typing import List, Optional
import asyncpg
import pytest
import toml
from fixtures.log_helper import getLogger
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder, Safekeeper
@@ -598,10 +597,7 @@ async def run_wal_lagging(env: NeonEnv, endpoint: Endpoint, test_output_dir: Pat
assert res == expected_sum
# Do inserts while restarting postgres and messing with safekeeper addresses.
# The test takes more than default 5 minutes on Postgres 16,
# see https://github.com/neondatabase/neon/issues/5305
@pytest.mark.timeout(600)
# do inserts while restarting postgres and messing with safekeeper addresses
def test_wal_lagging(neon_env_builder: NeonEnvBuilder, test_output_dir: Path):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()