mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 21:50:37 +00:00
Compare commits
1 Commits
jcsp/issue
...
jcsp/downg
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7f3670a589 |
@@ -224,8 +224,8 @@ RUN wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.7.tar.gz -
|
||||
FROM build-deps AS vector-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.5.1.tar.gz -O pgvector.tar.gz && \
|
||||
echo "cc7a8e034a96e30a819911ac79d32f6bc47bdd1aa2de4d7d4904e26b83209dc8 pgvector.tar.gz" | sha256sum --check && \
|
||||
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.5.0.tar.gz -O pgvector.tar.gz && \
|
||||
echo "d8aa3504b215467ca528525a6de12c3f85f9891b091ce0e5864dd8a9b757f77b pgvector.tar.gz" | sha256sum --check && \
|
||||
mkdir pgvector-src && cd pgvector-src && tar xvzf ../pgvector.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
//! allowing multiple api users to independently work with the same S3 bucket, if
|
||||
//! their bucket prefixes are both specified and different.
|
||||
|
||||
use std::{borrow::Cow, sync::Arc};
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use aws_config::{
|
||||
@@ -556,20 +556,6 @@ impl RemoteStorage for S3Bucket {
|
||||
.deleted_objects_total
|
||||
.inc_by(chunk.len() as u64);
|
||||
if let Some(errors) = resp.errors {
|
||||
// Log a bounded number of the errors within the response:
|
||||
// these requests can carry 1000 keys so logging each one
|
||||
// would be too verbose, especially as errors may lead us
|
||||
// to retry repeatedly.
|
||||
const LOG_UP_TO_N_ERRORS: usize = 10;
|
||||
for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
|
||||
tracing::warn!(
|
||||
"DeleteObjects key {} failed: {}: {}",
|
||||
e.key.as_ref().map(Cow::from).unwrap_or("".into()),
|
||||
e.code.as_ref().map(Cow::from).unwrap_or("".into()),
|
||||
e.message.as_ref().map(Cow::from).unwrap_or("".into())
|
||||
);
|
||||
}
|
||||
|
||||
return Err(anyhow::format_err!(
|
||||
"Failed to delete {} objects",
|
||||
errors.len()
|
||||
|
||||
@@ -12,8 +12,6 @@ use remote_storage::MAX_KEYS_PER_DELETE;
|
||||
use std::time::Duration;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
use utils::backoff;
|
||||
|
||||
use crate::metrics;
|
||||
|
||||
@@ -64,19 +62,7 @@ impl Deleter {
|
||||
Err(anyhow::anyhow!("failpoint hit"))
|
||||
});
|
||||
|
||||
// A backoff::retry is used here for two reasons:
|
||||
// - To provide a backoff rather than busy-polling the API on errors
|
||||
// - To absorb transient 429/503 conditions without hitting our error
|
||||
// logging path for issues deleting objects.
|
||||
backoff::retry(
|
||||
|| async { self.remote_storage.delete_objects(&self.accumulator).await },
|
||||
|_| false,
|
||||
3,
|
||||
10,
|
||||
"executing deletion batch",
|
||||
backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Shutting down")),
|
||||
)
|
||||
.await
|
||||
self.remote_storage.delete_objects(&self.accumulator).await
|
||||
}
|
||||
|
||||
/// Block until everything in accumulator has been executed
|
||||
@@ -101,10 +87,10 @@ impl Deleter {
|
||||
self.accumulator.clear();
|
||||
}
|
||||
Err(e) => {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(DeletionQueueError::ShuttingDown);
|
||||
}
|
||||
warn!("DeleteObjects request failed: {e:#}, will continue trying");
|
||||
// The RemoteStorage interface doesn't discriminate between
|
||||
// real errors and 503/429 responses, so we log at INFO level
|
||||
// to avoid propagating spurious error-severity logs.
|
||||
info!("DeleteObjects request failed: {e:#}, will retry");
|
||||
metrics::DELETION_QUEUE
|
||||
.remote_errors
|
||||
.with_label_values(&["execute"])
|
||||
|
||||
@@ -77,7 +77,7 @@ impl State {
|
||||
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
) -> anyhow::Result<Self> {
|
||||
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"]
|
||||
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"]
|
||||
.iter()
|
||||
.map(|v| v.parse().unwrap())
|
||||
.collect::<Vec<_>>();
|
||||
@@ -164,6 +164,9 @@ impl From<TenantStateError> for ApiError {
|
||||
fn from(tse: TenantStateError) -> ApiError {
|
||||
match tse {
|
||||
TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
|
||||
TenantStateError::NotActive(_) => {
|
||||
ApiError::ResourceUnavailable("Tenant not yet active".into())
|
||||
}
|
||||
TenantStateError::IsStopping(_) => {
|
||||
ApiError::ResourceUnavailable("Tenant is stopping".into())
|
||||
}
|
||||
|
||||
@@ -29,11 +29,11 @@ use std::cmp::min;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::BTreeSet;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::Debug;
|
||||
use std::fmt::Display;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::io;
|
||||
use std::ops::Bound::Included;
|
||||
use std::process::Command;
|
||||
use std::process::Stdio;
|
||||
@@ -49,6 +49,7 @@ use self::config::AttachmentMode;
|
||||
use self::config::LocationConf;
|
||||
use self::config::TenantConf;
|
||||
use self::delete::DeleteTenantFlow;
|
||||
use self::metadata::LoadMetadataError;
|
||||
use self::metadata::TimelineMetadata;
|
||||
use self::mgr::TenantsMap;
|
||||
use self::remote_timeline_client::RemoteTimelineClient;
|
||||
@@ -208,7 +209,7 @@ pub struct Tenant {
|
||||
|
||||
/// The remote storage generation, used to protect S3 objects from split-brain.
|
||||
/// Does not change over the lifetime of the [`Tenant`] object.
|
||||
///
|
||||
///
|
||||
/// This duplicates the generation stored in LocationConf, but that structure is mutable:
|
||||
/// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
|
||||
generation: Generation,
|
||||
@@ -372,13 +373,6 @@ struct RemoteStartupData {
|
||||
remote_metadata: TimelineMetadata,
|
||||
}
|
||||
|
||||
struct TimelinePreload {
|
||||
timeline_id: TimelineId,
|
||||
remote_client: Option<RemoteTimelineClient>,
|
||||
index_part: Option<IndexPart>,
|
||||
metadata: TimelineMetadata,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum WaitToBecomeActiveError {
|
||||
WillNotBecomeActive {
|
||||
@@ -419,6 +413,11 @@ pub enum CreateTimelineError {
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
struct TenantDirectoryScan {
|
||||
sorted_timelines_to_load: Vec<(TimelineId, TimelineMetadata)>,
|
||||
timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)>,
|
||||
}
|
||||
|
||||
enum CreateTimelineCause {
|
||||
Load,
|
||||
Delete,
|
||||
@@ -662,14 +661,41 @@ impl Tenant {
|
||||
Ok(tenant)
|
||||
}
|
||||
|
||||
fn download_indices(
|
||||
&self,
|
||||
timeline_ids: HashSet<TimelineId>,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
) -> JoinSet<Result<(TimelineId, RemoteTimelineClient, MaybeDeletedIndexPart), anyhow::Error>>
|
||||
{
|
||||
///
|
||||
/// Background task that downloads all data for a tenant and brings it to Active state.
|
||||
///
|
||||
/// No background tasks are started as part of this routine.
|
||||
///
|
||||
async fn attach(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let marker_file = self.conf.tenant_attaching_mark_file_path(&self.tenant_id);
|
||||
if !tokio::fs::try_exists(&marker_file)
|
||||
.await
|
||||
.context("check for existence of marker file")?
|
||||
{
|
||||
anyhow::bail!(
|
||||
"implementation error: marker file should exist at beginning of this function"
|
||||
);
|
||||
}
|
||||
|
||||
// Get list of remote timelines
|
||||
// download index files for every tenant timeline
|
||||
info!("listing remote timelines");
|
||||
|
||||
let remote_storage = self
|
||||
.remote_storage
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?;
|
||||
|
||||
let remote_timeline_ids =
|
||||
remote_timeline_client::list_remote_timelines(remote_storage, self.tenant_id).await?;
|
||||
|
||||
info!("found {} timelines", remote_timeline_ids.len());
|
||||
|
||||
// Download & parse index parts
|
||||
let mut part_downloads = JoinSet::new();
|
||||
for timeline_id in timeline_ids {
|
||||
for timeline_id in remote_timeline_ids {
|
||||
let client = RemoteTimelineClient::new(
|
||||
remote_storage.clone(),
|
||||
self.deletion_queue_client.clone(),
|
||||
@@ -698,56 +724,11 @@ impl Tenant {
|
||||
);
|
||||
}
|
||||
|
||||
part_downloads
|
||||
}
|
||||
|
||||
/// Special variant of preload_timelines that does not rely on remote storage
|
||||
async fn preload_timelines_local(
|
||||
self: &Arc<Self>,
|
||||
timeline_ids: &HashSet<TimelineId>,
|
||||
) -> anyhow::Result<Vec<TimelinePreload>> {
|
||||
let mut preload_map = HashMap::new();
|
||||
for timeline_id in timeline_ids {
|
||||
let metadata = load_metadata(self.conf, &self.tenant_id, timeline_id)?;
|
||||
preload_map.insert(
|
||||
*timeline_id,
|
||||
TimelinePreload {
|
||||
timeline_id: *timeline_id,
|
||||
remote_client: None,
|
||||
// TODO: synthesize an index_part and make it non-optional
|
||||
index_part: None,
|
||||
metadata,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
// Sort by ancestry
|
||||
Ok(
|
||||
tree_sort_timelines(preload_map, |p| p.metadata.ancestor_timeline())?
|
||||
.into_iter()
|
||||
.map(|i| i.1)
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Do the remote I/O and sorting required to prepare a list of timelines
|
||||
/// with their IndexParts, ready for hydrating into `Timeline`
|
||||
async fn preload_timelines(
|
||||
self: &Arc<Self>,
|
||||
timeline_ids: HashSet<TimelineId>,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
) -> anyhow::Result<Vec<TimelinePreload>> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let mut part_downloads = self.download_indices(timeline_ids, remote_storage);
|
||||
|
||||
let mut timelines_to_resume_deletions = vec![];
|
||||
|
||||
// We construct a map all timeline's preload state, prior to sorting
|
||||
// it by ancestry at the end of the function
|
||||
let mut preload_map: HashMap<TimelineId, TimelinePreload> = HashMap::new();
|
||||
|
||||
// Wait for all the download tasks to complete & collect results.
|
||||
let mut remote_index_and_client = HashMap::new();
|
||||
let mut timeline_ancestors = HashMap::new();
|
||||
while let Some(result) = part_downloads.join_next().await {
|
||||
// NB: we already added timeline_id as context to the error
|
||||
let result: Result<_, anyhow::Error> = result.context("joinset task join")?;
|
||||
@@ -755,16 +736,8 @@ impl Tenant {
|
||||
debug!("successfully downloaded index part for timeline {timeline_id}");
|
||||
match index_part {
|
||||
MaybeDeletedIndexPart::IndexPart(index_part) => {
|
||||
let metadata = index_part.metadata.clone();
|
||||
preload_map.insert(
|
||||
timeline_id,
|
||||
TimelinePreload {
|
||||
timeline_id,
|
||||
remote_client: Some(client),
|
||||
index_part: Some(index_part),
|
||||
metadata,
|
||||
},
|
||||
);
|
||||
timeline_ancestors.insert(timeline_id, index_part.metadata.clone());
|
||||
remote_index_and_client.insert(timeline_id, (index_part, client));
|
||||
}
|
||||
MaybeDeletedIndexPart::Deleted(index_part) => {
|
||||
info!(
|
||||
@@ -776,6 +749,35 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
// For every timeline, download the metadata file, scan the local directory,
|
||||
// and build a layer map that contains an entry for each remote and local
|
||||
// layer file.
|
||||
let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?;
|
||||
for (timeline_id, remote_metadata) in sorted_timelines {
|
||||
let (index_part, remote_client) = remote_index_and_client
|
||||
.remove(&timeline_id)
|
||||
.expect("just put it in above");
|
||||
|
||||
// TODO again handle early failure
|
||||
self.load_remote_timeline(
|
||||
timeline_id,
|
||||
index_part,
|
||||
remote_metadata,
|
||||
TimelineResources {
|
||||
remote_client: Some(remote_client),
|
||||
deletion_queue_client: self.deletion_queue_client.clone(),
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to load remote timeline {} for tenant {}",
|
||||
timeline_id, self.tenant_id
|
||||
)
|
||||
})?;
|
||||
}
|
||||
|
||||
// Walk through deleted timelines, resume deletion
|
||||
for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
|
||||
remote_timeline_client
|
||||
@@ -796,81 +798,6 @@ impl Tenant {
|
||||
.map_err(LoadLocalTimelineError::ResumeDeletion)?;
|
||||
}
|
||||
|
||||
// Sort by ancestry
|
||||
Ok(
|
||||
tree_sort_timelines(preload_map, |p| p.metadata.ancestor_timeline())?
|
||||
.into_iter()
|
||||
.map(|i| i.1)
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
///
|
||||
/// Background task that downloads all data for a tenant and brings it to Active state.
|
||||
///
|
||||
/// No background tasks are started as part of this routine.
|
||||
///
|
||||
async fn attach(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let marker_file = self.conf.tenant_attaching_mark_file_path(&self.tenant_id);
|
||||
if !tokio::fs::try_exists(&marker_file)
|
||||
.await
|
||||
.context("check for existence of marker file")?
|
||||
{
|
||||
anyhow::bail!(
|
||||
"implementation error: marker file should exist at beginning of this function"
|
||||
);
|
||||
}
|
||||
|
||||
// Get list of remote timelines
|
||||
info!("listing remote timelines");
|
||||
|
||||
let remote_storage = self
|
||||
.remote_storage
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?;
|
||||
|
||||
let remote_timeline_ids =
|
||||
remote_timeline_client::list_remote_timelines(remote_storage, self.tenant_id).await?;
|
||||
|
||||
info!("found {} timelines", remote_timeline_ids.len());
|
||||
|
||||
// Download & parse index parts
|
||||
let sorted_timelines = self
|
||||
.preload_timelines(remote_timeline_ids, remote_storage)
|
||||
.await?;
|
||||
|
||||
// For every timeline, download the metadata file, scan the local directory,
|
||||
// and build a layer map that contains an entry for each remote and local
|
||||
// layer file.
|
||||
for timeline_preload in sorted_timelines {
|
||||
let TimelinePreload {
|
||||
timeline_id,
|
||||
remote_client,
|
||||
index_part,
|
||||
metadata: _,
|
||||
} = timeline_preload;
|
||||
|
||||
// TODO again handle early failure
|
||||
self.load_remote_timeline(
|
||||
timeline_id,
|
||||
index_part.unwrap(),
|
||||
TimelineResources {
|
||||
remote_client,
|
||||
deletion_queue_client: self.deletion_queue_client.clone(),
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to load remote timeline {} for tenant {}",
|
||||
timeline_id, self.tenant_id
|
||||
)
|
||||
})?;
|
||||
}
|
||||
|
||||
std::fs::remove_file(&marker_file)
|
||||
.with_context(|| format!("unlink attach marker file {marker_file}"))?;
|
||||
crashsafe::fsync(marker_file.parent().expect("marker file has parent dir"))
|
||||
@@ -903,6 +830,7 @@ impl Tenant {
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
index_part: IndexPart,
|
||||
remote_metadata: TimelineMetadata,
|
||||
resources: TimelineResources,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
@@ -913,7 +841,7 @@ impl Tenant {
|
||||
.await
|
||||
.context("Failed to create new timeline directory")?;
|
||||
|
||||
let ancestor = if let Some(ancestor_id) = index_part.metadata.ancestor_timeline() {
|
||||
let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
Some(Arc::clone(timelines.get(&ancestor_id).ok_or_else(
|
||||
|| {
|
||||
@@ -931,7 +859,6 @@ impl Tenant {
|
||||
// cannot be older than the local one
|
||||
let local_metadata = None;
|
||||
|
||||
let remote_metadata = index_part.metadata.clone();
|
||||
self.timeline_init_and_sync(
|
||||
timeline_id,
|
||||
resources,
|
||||
@@ -1105,9 +1032,12 @@ impl Tenant {
|
||||
tenant
|
||||
}
|
||||
|
||||
async fn scan_timelines_dir(self: &Arc<Tenant>) -> anyhow::Result<HashSet<TimelineId>> {
|
||||
let mut timelines_to_load: HashSet<TimelineId> = HashSet::new();
|
||||
let mut timelines_to_resume_deletion: HashSet<TimelineId> = HashSet::new();
|
||||
fn scan_and_sort_timelines_dir(self: Arc<Tenant>) -> anyhow::Result<TenantDirectoryScan> {
|
||||
let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
|
||||
// Note timelines_to_resume_deletion needs to be separate because it can be not sortable
|
||||
// from the point of `tree_sort_timelines`. I e some parents can be missing because deletion
|
||||
// completed in non topological order (for example because parent has smaller number of layer files in it)
|
||||
let mut timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)> = vec![];
|
||||
|
||||
let timelines_dir = self.conf.timelines_path(&self.tenant_id);
|
||||
|
||||
@@ -1156,7 +1086,38 @@ impl Tenant {
|
||||
})?;
|
||||
|
||||
info!("Found deletion mark for timeline {}", timeline_id);
|
||||
timelines_to_resume_deletion.insert(timeline_id);
|
||||
|
||||
match load_metadata(self.conf, &self.tenant_id, &timeline_id) {
|
||||
Ok(metadata) => {
|
||||
timelines_to_resume_deletion.push((timeline_id, Some(metadata)))
|
||||
}
|
||||
Err(e) => match &e {
|
||||
LoadMetadataError::Read(r) => {
|
||||
if r.kind() != io::ErrorKind::NotFound {
|
||||
return Err(anyhow::anyhow!(e)).with_context(|| {
|
||||
format!("Failed to load metadata for timeline_id {timeline_id}")
|
||||
});
|
||||
}
|
||||
|
||||
// If metadata doesnt exist it means that we've crashed without
|
||||
// completing cleanup_remaining_timeline_fs_traces in DeleteTimelineFlow.
|
||||
// So save timeline_id for later call to `DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces`.
|
||||
// We cant do it here because the method is async so we'd need block_on
|
||||
// and here we're in spawn_blocking. cleanup_remaining_timeline_fs_traces uses fs operations
|
||||
// so that basically results in a cycle:
|
||||
// spawn_blocking
|
||||
// - block_on
|
||||
// - spawn_blocking
|
||||
// which can lead to running out of threads in blocing pool.
|
||||
timelines_to_resume_deletion.push((timeline_id, None));
|
||||
}
|
||||
_ => {
|
||||
return Err(anyhow::anyhow!(e)).with_context(|| {
|
||||
format!("Failed to load metadata for timeline_id {timeline_id}")
|
||||
})
|
||||
}
|
||||
},
|
||||
}
|
||||
} else {
|
||||
if !timeline_dir.exists() {
|
||||
warn!("Timeline dir entry become invalid: {timeline_dir}");
|
||||
@@ -1194,7 +1155,9 @@ impl Tenant {
|
||||
|
||||
let file_name = entry.file_name();
|
||||
if let Ok(timeline_id) = file_name.parse::<TimelineId>() {
|
||||
timelines_to_load.insert(timeline_id);
|
||||
let metadata = load_metadata(self.conf, &self.tenant_id, &timeline_id)
|
||||
.context("failed to load metadata")?;
|
||||
timelines_to_load.insert(timeline_id, metadata);
|
||||
} else {
|
||||
// A file or directory that doesn't look like a timeline ID
|
||||
warn!("unexpected file or directory in timelines directory: {file_name}");
|
||||
@@ -1202,18 +1165,14 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
for timeline_id in timelines_to_resume_deletion {
|
||||
if let Err(e) =
|
||||
DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(self, timeline_id).await
|
||||
{
|
||||
warn!(
|
||||
"cannot clean up deleted timeline dir timeline_id: {} error: {:#}",
|
||||
timeline_id, e
|
||||
);
|
||||
// Sort the array of timeline IDs into tree-order, so that parent comes before
|
||||
// all its children.
|
||||
tree_sort_timelines(timelines_to_load, |m| m.ancestor_timeline()).map(|sorted_timelines| {
|
||||
TenantDirectoryScan {
|
||||
sorted_timelines_to_load: sorted_timelines,
|
||||
timelines_to_resume_deletion,
|
||||
}
|
||||
}
|
||||
|
||||
Ok(timelines_to_load)
|
||||
})
|
||||
}
|
||||
|
||||
///
|
||||
@@ -1236,34 +1195,24 @@ impl Tenant {
|
||||
//
|
||||
// Scan the directory, peek into the metadata file of each timeline, and
|
||||
// collect a list of timelines and their ancestors.
|
||||
let span = info_span!("blocking");
|
||||
let cloned = Arc::clone(self);
|
||||
|
||||
let local_timelines = tokio::task::spawn(async move { cloned.scan_timelines_dir().await })
|
||||
.await
|
||||
.context("load spawn_blocking")
|
||||
.and_then(|res| res)?;
|
||||
let scan = tokio::task::spawn_blocking(move || {
|
||||
let _g = span.entered();
|
||||
cloned.scan_and_sort_timelines_dir()
|
||||
})
|
||||
.await
|
||||
.context("load spawn_blocking")
|
||||
.and_then(|res| res)?;
|
||||
|
||||
let sorted_timelines = match &self.remote_storage {
|
||||
Some(remote_storage) => {
|
||||
self.preload_timelines(local_timelines, remote_storage)
|
||||
.await?
|
||||
}
|
||||
None => {
|
||||
// Deprecated mode, only used in dev.
|
||||
self.preload_timelines_local(&local_timelines).await?
|
||||
}
|
||||
};
|
||||
|
||||
for timeline_preload in sorted_timelines {
|
||||
let TimelinePreload {
|
||||
timeline_id,
|
||||
remote_client: _,
|
||||
index_part: _,
|
||||
metadata,
|
||||
} = timeline_preload;
|
||||
// FIXME original collect_timeline_files contained one more check:
|
||||
// 1. "Timeline has no ancestor and no layer files"
|
||||
|
||||
// Process loadable timelines first
|
||||
for (timeline_id, local_metadata) in scan.sorted_timelines_to_load {
|
||||
if let Err(e) = self
|
||||
.load_local_timeline(timeline_id, metadata, init_order, ctx, false)
|
||||
.load_local_timeline(timeline_id, local_metadata, init_order, ctx, false)
|
||||
.await
|
||||
{
|
||||
match e {
|
||||
@@ -1280,6 +1229,43 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
// Resume deletion ones with deleted_mark
|
||||
for (timeline_id, maybe_local_metadata) in scan.timelines_to_resume_deletion {
|
||||
match maybe_local_metadata {
|
||||
None => {
|
||||
// See comment in `scan_and_sort_timelines_dir`.
|
||||
if let Err(e) =
|
||||
DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(self, timeline_id)
|
||||
.await
|
||||
{
|
||||
warn!(
|
||||
"cannot clean up deleted timeline dir timeline_id: {} error: {:#}",
|
||||
timeline_id, e
|
||||
);
|
||||
}
|
||||
}
|
||||
Some(local_metadata) => {
|
||||
if let Err(e) = self
|
||||
.load_local_timeline(timeline_id, local_metadata, init_order, ctx, true)
|
||||
.await
|
||||
{
|
||||
match e {
|
||||
LoadLocalTimelineError::Load(source) => {
|
||||
// We tried to load deleted timeline, this is a bug.
|
||||
return Err(anyhow::anyhow!(source).context(
|
||||
"This is a bug. We tried to load deleted timeline which is wrong and loading failed. Timeline: {timeline_id}"
|
||||
));
|
||||
}
|
||||
LoadLocalTimelineError::ResumeDeletion(source) => {
|
||||
// Make sure resumed deletion wont fail loading for entire tenant.
|
||||
error!("Failed to resume timeline deletion: {source:#}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trace!("Done");
|
||||
|
||||
Ok(())
|
||||
@@ -2769,11 +2755,6 @@ impl Tenant {
|
||||
) -> Result<Arc<Timeline>, CreateTimelineError> {
|
||||
let src_id = src_timeline.timeline_id;
|
||||
|
||||
// First acquire the GC lock so that another task cannot advance the GC
|
||||
// cutoff in 'gc_info', and make 'start_lsn' invalid, while we are
|
||||
// creating the branch.
|
||||
let _gc_cs = self.gc_cs.lock().await;
|
||||
|
||||
// If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN
|
||||
let start_lsn = start_lsn.unwrap_or_else(|| {
|
||||
let lsn = src_timeline.get_last_record_lsn();
|
||||
@@ -2781,6 +2762,11 @@ impl Tenant {
|
||||
lsn
|
||||
});
|
||||
|
||||
// First acquire the GC lock so that another task cannot advance the GC
|
||||
// cutoff in 'gc_info', and make 'start_lsn' invalid, while we are
|
||||
// creating the branch.
|
||||
let _gc_cs = self.gc_cs.lock().await;
|
||||
|
||||
// Create a placeholder for the new branch. This will error
|
||||
// out if the new timeline ID is already in use.
|
||||
let timeline_uninit_mark = {
|
||||
|
||||
@@ -31,7 +31,7 @@ use super::{
|
||||
const SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS: u32 = 3;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum DeleteTenantError {
|
||||
pub enum DeleteTenantError {
|
||||
#[error("GetTenant {0}")]
|
||||
Get(#[from] GetTenantError),
|
||||
|
||||
@@ -376,7 +376,7 @@ impl DeleteTenantFlow {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn should_resume_deletion(
|
||||
pub async fn should_resume_deletion(
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<&GenericRemoteStorage>,
|
||||
tenant: &Tenant,
|
||||
|
||||
@@ -50,7 +50,7 @@ use super::TenantSharedResources;
|
||||
/// its lifetime, and we can preserve some important safety invariants like `Tenant` always
|
||||
/// having a properly acquired generation (Secondary doesn't need a generation)
|
||||
#[derive(Clone)]
|
||||
pub(crate) enum TenantSlot {
|
||||
pub enum TenantSlot {
|
||||
Attached(Arc<Tenant>),
|
||||
Secondary,
|
||||
}
|
||||
@@ -481,7 +481,7 @@ pub(crate) fn schedule_local_tenant_processing(
|
||||
/// management API. For example, it could attach the tenant on a different pageserver.
|
||||
/// We would then be in split-brain once this pageserver restarts.
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn shutdown_all_tenants() {
|
||||
pub async fn shutdown_all_tenants() {
|
||||
shutdown_all_tenants0(&TENANTS).await
|
||||
}
|
||||
|
||||
@@ -593,7 +593,7 @@ async fn shutdown_all_tenants0(tenants: &tokio::sync::RwLock<TenantsMap>) {
|
||||
// caller will log how long we took
|
||||
}
|
||||
|
||||
pub(crate) async fn create_tenant(
|
||||
pub async fn create_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: TenantConfOpt,
|
||||
tenant_id: TenantId,
|
||||
@@ -628,14 +628,14 @@ pub(crate) async fn create_tenant(
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum SetNewTenantConfigError {
|
||||
pub enum SetNewTenantConfigError {
|
||||
#[error(transparent)]
|
||||
GetTenant(#[from] GetTenantError),
|
||||
#[error(transparent)]
|
||||
Persist(anyhow::Error),
|
||||
}
|
||||
|
||||
pub(crate) async fn set_new_tenant_config(
|
||||
pub async fn set_new_tenant_config(
|
||||
conf: &'static PageServerConf,
|
||||
new_tenant_conf: TenantConfOpt,
|
||||
tenant_id: TenantId,
|
||||
@@ -776,7 +776,7 @@ pub(crate) async fn upsert_location(
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum GetTenantError {
|
||||
pub enum GetTenantError {
|
||||
#[error("Tenant {0} not found")]
|
||||
NotFound(TenantId),
|
||||
#[error("Tenant {0} is not active")]
|
||||
@@ -792,7 +792,7 @@ pub(crate) enum GetTenantError {
|
||||
/// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
|
||||
///
|
||||
/// This method is cancel-safe.
|
||||
pub(crate) async fn get_tenant(
|
||||
pub async fn get_tenant(
|
||||
tenant_id: TenantId,
|
||||
active_only: bool,
|
||||
) -> Result<Arc<Tenant>, GetTenantError> {
|
||||
@@ -817,7 +817,7 @@ pub(crate) async fn get_tenant(
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_tenant(
|
||||
pub async fn delete_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
tenant_id: TenantId,
|
||||
@@ -826,7 +826,7 @@ pub(crate) async fn delete_tenant(
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum DeleteTimelineError {
|
||||
pub enum DeleteTimelineError {
|
||||
#[error("Tenant {0}")]
|
||||
Tenant(#[from] GetTenantError),
|
||||
|
||||
@@ -834,7 +834,7 @@ pub(crate) enum DeleteTimelineError {
|
||||
Timeline(#[from] crate::tenant::DeleteTimelineError),
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_timeline(
|
||||
pub async fn delete_timeline(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
_ctx: &RequestContext,
|
||||
@@ -845,16 +845,18 @@ pub(crate) async fn delete_timeline(
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum TenantStateError {
|
||||
pub enum TenantStateError {
|
||||
#[error("Tenant {0} not found")]
|
||||
NotFound(TenantId),
|
||||
#[error("Tenant {0} is stopping")]
|
||||
IsStopping(TenantId),
|
||||
#[error("Tenant {0} is not active")]
|
||||
NotActive(TenantId),
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
pub(crate) async fn detach_tenant(
|
||||
pub async fn detach_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
detach_ignored: bool,
|
||||
@@ -924,7 +926,7 @@ async fn detach_tenant0(
|
||||
removal_result
|
||||
}
|
||||
|
||||
pub(crate) async fn load_tenant(
|
||||
pub async fn load_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
generation: Generation,
|
||||
@@ -961,7 +963,7 @@ pub(crate) async fn load_tenant(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn ignore_tenant(
|
||||
pub async fn ignore_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<(), TenantStateError> {
|
||||
@@ -989,7 +991,7 @@ async fn ignore_tenant0(
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum TenantMapListError {
|
||||
pub enum TenantMapListError {
|
||||
#[error("tenant map is still initiailizing")]
|
||||
Initializing,
|
||||
}
|
||||
@@ -997,7 +999,7 @@ pub(crate) enum TenantMapListError {
|
||||
///
|
||||
/// Get list of tenants, for the mgmt API
|
||||
///
|
||||
pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
|
||||
pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
|
||||
let tenants = TENANTS.read().await;
|
||||
let m = match &*tenants {
|
||||
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
|
||||
@@ -1015,7 +1017,7 @@ pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, Tenan
|
||||
///
|
||||
/// Downloading all the tenant data is performed in the background, this merely
|
||||
/// spawns the background task and returns quickly.
|
||||
pub(crate) async fn attach_tenant(
|
||||
pub async fn attach_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
generation: Generation,
|
||||
@@ -1052,7 +1054,7 @@ pub(crate) async fn attach_tenant(
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum TenantMapInsertError {
|
||||
pub enum TenantMapInsertError {
|
||||
#[error("tenant map is still initializing")]
|
||||
StillInitializing,
|
||||
#[error("tenant map is shutting down")]
|
||||
@@ -1215,7 +1217,7 @@ use {
|
||||
utils::http::error::ApiError,
|
||||
};
|
||||
|
||||
pub(crate) async fn immediate_gc(
|
||||
pub async fn immediate_gc(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
gc_req: TimelineGcRequest,
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use futures::future::Either;
|
||||
use proxy::auth;
|
||||
use proxy::config::HttpConfig;
|
||||
use proxy::console;
|
||||
use proxy::http;
|
||||
use proxy::metrics;
|
||||
@@ -80,9 +79,6 @@ struct ProxyCliArgs {
|
||||
/// Allow self-signed certificates for compute nodes (for testing)
|
||||
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
|
||||
allow_self_signed_compute: bool,
|
||||
/// timeout for http connections
|
||||
#[clap(long, default_value = "15s", value_parser = humantime::parse_duration)]
|
||||
sql_over_http_timeout: tokio::time::Duration,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
@@ -224,15 +220,12 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
auth::BackendType::Link(Cow::Owned(url))
|
||||
}
|
||||
};
|
||||
let http_config = HttpConfig {
|
||||
sql_over_http_timeout: args.sql_over_http_timeout,
|
||||
};
|
||||
|
||||
let config = Box::leak(Box::new(ProxyConfig {
|
||||
tls_config,
|
||||
auth_backend,
|
||||
metric_collection,
|
||||
allow_self_signed_compute: args.allow_self_signed_compute,
|
||||
http_config,
|
||||
}));
|
||||
|
||||
Ok(config)
|
||||
|
||||
@@ -13,7 +13,6 @@ pub struct ProxyConfig {
|
||||
pub auth_backend: auth::BackendType<'static, ()>,
|
||||
pub metric_collection: Option<MetricCollectionConfig>,
|
||||
pub allow_self_signed_compute: bool,
|
||||
pub http_config: HttpConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -27,10 +26,6 @@ pub struct TlsConfig {
|
||||
pub common_names: Option<HashSet<String>>,
|
||||
}
|
||||
|
||||
pub struct HttpConfig {
|
||||
pub sql_over_http_timeout: tokio::time::Duration,
|
||||
}
|
||||
|
||||
impl TlsConfig {
|
||||
pub fn to_server_config(&self) -> Arc<rustls::ServerConfig> {
|
||||
self.config.clone()
|
||||
|
||||
@@ -20,7 +20,6 @@ use tokio_postgres::AsyncMessage;
|
||||
use crate::{
|
||||
auth, console,
|
||||
metrics::{Ids, MetricCounter, USAGE_METRICS},
|
||||
proxy::{NUM_DB_CONNECTIONS_CLOSED_COUNTER, NUM_DB_CONNECTIONS_OPENED_COUNTER},
|
||||
};
|
||||
use crate::{compute, config};
|
||||
|
||||
@@ -419,42 +418,36 @@ async fn connect_to_compute_once(
|
||||
};
|
||||
|
||||
tokio::spawn(
|
||||
async move {
|
||||
NUM_DB_CONNECTIONS_OPENED_COUNTER.with_label_values(&["http"]).inc();
|
||||
scopeguard::defer! {
|
||||
NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc();
|
||||
poll_fn(move |cx| {
|
||||
if matches!(rx.has_changed(), Ok(true)) {
|
||||
session = *rx.borrow_and_update();
|
||||
info!(%session, "changed session");
|
||||
}
|
||||
poll_fn(move |cx| {
|
||||
if matches!(rx.has_changed(), Ok(true)) {
|
||||
session = *rx.borrow_and_update();
|
||||
info!(%session, "changed session");
|
||||
}
|
||||
|
||||
loop {
|
||||
let message = ready!(connection.poll_message(cx));
|
||||
loop {
|
||||
let message = ready!(connection.poll_message(cx));
|
||||
|
||||
match message {
|
||||
Some(Ok(AsyncMessage::Notice(notice))) => {
|
||||
info!(%session, "notice: {}", notice);
|
||||
}
|
||||
Some(Ok(AsyncMessage::Notification(notif))) => {
|
||||
warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received");
|
||||
}
|
||||
Some(Ok(_)) => {
|
||||
warn!(%session, "unknown message");
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
error!(%session, "connection error: {}", e);
|
||||
return Poll::Ready(())
|
||||
}
|
||||
None => {
|
||||
info!("connection closed");
|
||||
return Poll::Ready(())
|
||||
}
|
||||
match message {
|
||||
Some(Ok(AsyncMessage::Notice(notice))) => {
|
||||
info!(%session, "notice: {}", notice);
|
||||
}
|
||||
Some(Ok(AsyncMessage::Notification(notif))) => {
|
||||
warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received");
|
||||
}
|
||||
Some(Ok(_)) => {
|
||||
warn!(%session, "unknown message");
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
error!(%session, "connection error: {}", e);
|
||||
return Poll::Ready(())
|
||||
}
|
||||
None => {
|
||||
info!("connection closed");
|
||||
return Poll::Ready(())
|
||||
}
|
||||
}
|
||||
}).await
|
||||
}
|
||||
}
|
||||
})
|
||||
.instrument(span)
|
||||
);
|
||||
|
||||
|
||||
@@ -24,9 +24,6 @@ use url::Url;
|
||||
use utils::http::error::ApiError;
|
||||
use utils::http::json::json_response;
|
||||
|
||||
use crate::config::HttpConfig;
|
||||
use crate::proxy::{NUM_CONNECTIONS_ACCEPTED_COUNTER, NUM_CONNECTIONS_CLOSED_COUNTER};
|
||||
|
||||
use super::conn_pool::ConnInfo;
|
||||
use super::conn_pool::GlobalConnPool;
|
||||
|
||||
@@ -50,6 +47,7 @@ enum Payload {
|
||||
|
||||
const MAX_RESPONSE_SIZE: usize = 10 * 1024 * 1024; // 10 MiB
|
||||
const MAX_REQUEST_SIZE: u64 = 10 * 1024 * 1024; // 10 MiB
|
||||
const HTTP_CONNECTION_TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_secs(15);
|
||||
|
||||
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
|
||||
static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode");
|
||||
@@ -191,10 +189,9 @@ pub async fn handle(
|
||||
sni_hostname: Option<String>,
|
||||
conn_pool: Arc<GlobalConnPool>,
|
||||
session_id: uuid::Uuid,
|
||||
config: &'static HttpConfig,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let result = tokio::time::timeout(
|
||||
config.sql_over_http_timeout,
|
||||
HTTP_CONNECTION_TIMEOUT,
|
||||
handle_inner(request, sni_hostname, conn_pool, session_id),
|
||||
)
|
||||
.await;
|
||||
@@ -225,7 +222,7 @@ pub async fn handle(
|
||||
Err(_) => {
|
||||
let message = format!(
|
||||
"HTTP-Connection timed out, execution time exeeded {} seconds",
|
||||
config.sql_over_http_timeout.as_secs()
|
||||
HTTP_CONNECTION_TIMEOUT.as_secs()
|
||||
);
|
||||
error!(message);
|
||||
json_response(
|
||||
@@ -248,13 +245,6 @@ async fn handle_inner(
|
||||
conn_pool: Arc<GlobalConnPool>,
|
||||
session_id: uuid::Uuid,
|
||||
) -> anyhow::Result<Response<Body>> {
|
||||
NUM_CONNECTIONS_ACCEPTED_COUNTER
|
||||
.with_label_values(&["http"])
|
||||
.inc();
|
||||
scopeguard::defer! {
|
||||
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc();
|
||||
}
|
||||
|
||||
//
|
||||
// Determine the destination and connection params
|
||||
//
|
||||
|
||||
@@ -3,10 +3,7 @@ use crate::{
|
||||
config::ProxyConfig,
|
||||
error::io_error,
|
||||
protocol2::{ProxyProtocolAccept, WithClientIp},
|
||||
proxy::{
|
||||
handle_client, ClientMode, NUM_CLIENT_CONNECTION_CLOSED_COUNTER,
|
||||
NUM_CLIENT_CONNECTION_OPENED_COUNTER,
|
||||
},
|
||||
proxy::{handle_client, ClientMode},
|
||||
};
|
||||
use bytes::{Buf, Bytes};
|
||||
use futures::{Sink, Stream, StreamExt};
|
||||
@@ -205,14 +202,7 @@ async fn ws_handler(
|
||||
// TODO: that deserves a refactor as now this function also handles http json client besides websockets.
|
||||
// Right now I don't want to blow up sql-over-http patch with file renames and do that as a follow up instead.
|
||||
} else if request.uri().path() == "/sql" && request.method() == Method::POST {
|
||||
sql_over_http::handle(
|
||||
request,
|
||||
sni_hostname,
|
||||
conn_pool,
|
||||
session_id,
|
||||
&config.http_config,
|
||||
)
|
||||
.await
|
||||
sql_over_http::handle(request, sni_hostname, conn_pool, session_id).await
|
||||
} else if request.uri().path() == "/sql" && request.method() == Method::OPTIONS {
|
||||
Response::builder()
|
||||
.header("Allow", "OPTIONS, POST")
|
||||
@@ -285,25 +275,23 @@ pub async fn task_main(
|
||||
let conn_pool = conn_pool.clone();
|
||||
|
||||
async move {
|
||||
Ok::<_, Infallible>(MetricService::new(hyper::service::service_fn(
|
||||
move |req: Request<Body>| {
|
||||
let sni_name = sni_name.clone();
|
||||
let conn_pool = conn_pool.clone();
|
||||
Ok::<_, Infallible>(hyper::service::service_fn(move |req: Request<Body>| {
|
||||
let sni_name = sni_name.clone();
|
||||
let conn_pool = conn_pool.clone();
|
||||
|
||||
async move {
|
||||
let cancel_map = Arc::new(CancelMap::default());
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
async move {
|
||||
let cancel_map = Arc::new(CancelMap::default());
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
|
||||
ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name)
|
||||
.instrument(info_span!(
|
||||
"ws-client",
|
||||
session = %session_id,
|
||||
%peer_addr,
|
||||
))
|
||||
.await
|
||||
}
|
||||
},
|
||||
)))
|
||||
ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name)
|
||||
.instrument(info_span!(
|
||||
"ws-client",
|
||||
session = %session_id,
|
||||
%peer_addr,
|
||||
))
|
||||
.await
|
||||
}
|
||||
}))
|
||||
}
|
||||
},
|
||||
);
|
||||
@@ -315,41 +303,3 @@ pub async fn task_main(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct MetricService<S> {
|
||||
inner: S,
|
||||
}
|
||||
|
||||
impl<S> MetricService<S> {
|
||||
fn new(inner: S) -> MetricService<S> {
|
||||
NUM_CLIENT_CONNECTION_OPENED_COUNTER
|
||||
.with_label_values(&["http"])
|
||||
.inc();
|
||||
MetricService { inner }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Drop for MetricService<S> {
|
||||
fn drop(&mut self) {
|
||||
NUM_CLIENT_CONNECTION_CLOSED_COUNTER
|
||||
.with_label_values(&["http"])
|
||||
.inc();
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, ReqBody> hyper::service::Service<Request<ReqBody>> for MetricService<S>
|
||||
where
|
||||
S: hyper::service::Service<Request<ReqBody>>,
|
||||
{
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type Future = S::Future;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.inner.poll_ready(cx)
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
|
||||
self.inner.call(req)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,55 +39,19 @@ const RETRY_WAIT_EXPONENT_BASE: f64 = std::f64::consts::SQRT_2;
|
||||
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
|
||||
const ERR_PROTO_VIOLATION: &str = "protocol violation";
|
||||
|
||||
pub static NUM_DB_CONNECTIONS_OPENED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_opened_db_connections_total",
|
||||
"Number of opened connections to a database.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_DB_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_closed_db_connections_total",
|
||||
"Number of closed connections to a database.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_CLIENT_CONNECTION_OPENED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_opened_client_connections_total",
|
||||
"Number of opened connections from a client.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_CLIENT_CONNECTION_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_closed_client_connections_total",
|
||||
"Number of closed connections from a client.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_accepted_connections_total",
|
||||
"Number of client connections accepted.",
|
||||
"Number of TCP client connections accepted.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_closed_connections_total",
|
||||
"Number of client connections closed.",
|
||||
"Number of TCP client connections closed.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
@@ -254,16 +218,12 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
"handling interactive connection from client"
|
||||
);
|
||||
|
||||
let proto = mode.protocol_label();
|
||||
NUM_CLIENT_CONNECTION_OPENED_COUNTER
|
||||
.with_label_values(&[proto])
|
||||
.inc();
|
||||
// The `closed` counter will increase when this future is destroyed.
|
||||
NUM_CONNECTIONS_ACCEPTED_COUNTER
|
||||
.with_label_values(&[proto])
|
||||
.with_label_values(&[mode.protocol_label()])
|
||||
.inc();
|
||||
scopeguard::defer! {
|
||||
NUM_CLIENT_CONNECTION_CLOSED_COUNTER.with_label_values(&[proto]).inc();
|
||||
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc();
|
||||
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[mode.protocol_label()]).inc();
|
||||
}
|
||||
|
||||
let tls = config.tls_config.as_ref();
|
||||
@@ -298,7 +258,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
mode.allow_self_signed_compute(config),
|
||||
);
|
||||
cancel_map
|
||||
.with_session(|session| client.connect_to_db(session, mode))
|
||||
.with_session(|session| client.connect_to_db(session, mode.allow_cleartext()))
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -774,7 +734,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
|
||||
async fn connect_to_db(
|
||||
self,
|
||||
session: cancellation::Session<'_>,
|
||||
mode: ClientMode,
|
||||
allow_cleartext: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let Self {
|
||||
mut stream,
|
||||
@@ -790,7 +750,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
|
||||
};
|
||||
|
||||
let auth_result = match creds
|
||||
.authenticate(&extra, &mut stream, mode.allow_cleartext())
|
||||
.authenticate(&extra, &mut stream, allow_cleartext)
|
||||
.await
|
||||
{
|
||||
Ok(auth_result) => auth_result,
|
||||
@@ -816,14 +776,6 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
|
||||
.or_else(|e| stream.throw_error(e))
|
||||
.await?;
|
||||
|
||||
let proto = mode.protocol_label();
|
||||
NUM_DB_CONNECTIONS_OPENED_COUNTER
|
||||
.with_label_values(&[proto])
|
||||
.inc();
|
||||
scopeguard::defer! {
|
||||
NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc();
|
||||
}
|
||||
|
||||
prepare_client_connection(&node, reported_auth_ok, session, &mut stream).await?;
|
||||
// Before proxy passing, forward to compute whatever data is left in the
|
||||
// PqStream input buffer. Normally there is none, but our serverless npm
|
||||
|
||||
@@ -374,12 +374,8 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
if conf.http_auth.is_some() {
|
||||
router = router.middleware(auth_middleware(|request| {
|
||||
#[allow(clippy::mutable_key_type)]
|
||||
static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> = Lazy::new(|| {
|
||||
["/v1/status", "/metrics"]
|
||||
.iter()
|
||||
.map(|v| v.parse().unwrap())
|
||||
.collect()
|
||||
});
|
||||
static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> =
|
||||
Lazy::new(|| ["/v1/status"].iter().map(|v| v.parse().unwrap()).collect());
|
||||
if ALLOWLIST_ROUTES.contains(request.uri()) {
|
||||
None
|
||||
} else {
|
||||
|
||||
@@ -65,7 +65,7 @@ def start_heavy_write_workload(env: PgCompare, n_tables: int, scale: int, num_it
|
||||
|
||||
def start_single_table_workload(table_id: int):
|
||||
for _ in range(num_iters):
|
||||
with env.pg.connect(options="-cstatement_timeout=300s").cursor() as cur:
|
||||
with env.pg.connect().cursor() as cur:
|
||||
cur.execute(
|
||||
f"INSERT INTO t{table_id} SELECT FROM generate_series(1,{new_rows_each_update})"
|
||||
)
|
||||
|
||||
@@ -6,7 +6,6 @@ from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
import asyncpg
|
||||
import pytest
|
||||
import toml
|
||||
from fixtures.log_helper import getLogger
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder, Safekeeper
|
||||
@@ -598,10 +597,7 @@ async def run_wal_lagging(env: NeonEnv, endpoint: Endpoint, test_output_dir: Pat
|
||||
assert res == expected_sum
|
||||
|
||||
|
||||
# Do inserts while restarting postgres and messing with safekeeper addresses.
|
||||
# The test takes more than default 5 minutes on Postgres 16,
|
||||
# see https://github.com/neondatabase/neon/issues/5305
|
||||
@pytest.mark.timeout(600)
|
||||
# do inserts while restarting postgres and messing with safekeeper addresses
|
||||
def test_wal_lagging(neon_env_builder: NeonEnvBuilder, test_output_dir: Path):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
Reference in New Issue
Block a user