mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 14:40:37 +00:00
Compare commits
2 Commits
walredo-re
...
problame/d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3b359c9e51 | ||
|
|
cb19f95872 |
25
Cargo.lock
generated
25
Cargo.lock
generated
@@ -570,9 +570,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-smithy-types"
|
||||
version = "0.56.0"
|
||||
version = "0.56.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "eed0a94eefd845a2a78677f1b72f02fa75802d38f7f59be675add140279aa8bf"
|
||||
checksum = "d90dbc8da2f6be461fa3c1906b20af8f79d14968fe47f2b7d29d086f62a51728"
|
||||
dependencies = [
|
||||
"base64-simd",
|
||||
"itoa",
|
||||
@@ -2420,6 +2420,16 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nu-ansi-term"
|
||||
version = "0.46.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
|
||||
dependencies = [
|
||||
"overload",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-bigint"
|
||||
version = "0.4.3"
|
||||
@@ -2652,6 +2662,12 @@ version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a"
|
||||
|
||||
[[package]]
|
||||
name = "overload"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
|
||||
|
||||
[[package]]
|
||||
name = "pagectl"
|
||||
version = "0.1.0"
|
||||
@@ -3748,9 +3764,11 @@ dependencies = [
|
||||
"aws-config",
|
||||
"aws-sdk-s3",
|
||||
"aws-smithy-http",
|
||||
"aws-smithy-types",
|
||||
"aws-types",
|
||||
"bincode",
|
||||
"bytes",
|
||||
"camino",
|
||||
"chrono",
|
||||
"clap",
|
||||
"crc32c",
|
||||
@@ -3758,9 +3776,11 @@ dependencies = [
|
||||
"futures-util",
|
||||
"hex",
|
||||
"histogram",
|
||||
"humantime",
|
||||
"itertools",
|
||||
"pageserver",
|
||||
"rand",
|
||||
"remote_storage",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -4942,6 +4962,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
|
||||
dependencies = [
|
||||
"matchers",
|
||||
"nu-ansi-term",
|
||||
"once_cell",
|
||||
"regex",
|
||||
"serde",
|
||||
|
||||
@@ -224,8 +224,8 @@ RUN wget https://github.com/df7cb/postgresql-unit/archive/refs/tags/7.7.tar.gz -
|
||||
FROM build-deps AS vector-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.5.1.tar.gz -O pgvector.tar.gz && \
|
||||
echo "cc7a8e034a96e30a819911ac79d32f6bc47bdd1aa2de4d7d4904e26b83209dc8 pgvector.tar.gz" | sha256sum --check && \
|
||||
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.5.0.tar.gz -O pgvector.tar.gz && \
|
||||
echo "d8aa3504b215467ca528525a6de12c3f85f9891b091ce0e5864dd8a9b757f77b pgvector.tar.gz" | sha256sum --check && \
|
||||
mkdir pgvector-src && cd pgvector-src && tar xvzf ../pgvector.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
//! allowing multiple api users to independently work with the same S3 bucket, if
|
||||
//! their bucket prefixes are both specified and different.
|
||||
|
||||
use std::{borrow::Cow, sync::Arc};
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use aws_config::{
|
||||
@@ -556,20 +556,6 @@ impl RemoteStorage for S3Bucket {
|
||||
.deleted_objects_total
|
||||
.inc_by(chunk.len() as u64);
|
||||
if let Some(errors) = resp.errors {
|
||||
// Log a bounded number of the errors within the response:
|
||||
// these requests can carry 1000 keys so logging each one
|
||||
// would be too verbose, especially as errors may lead us
|
||||
// to retry repeatedly.
|
||||
const LOG_UP_TO_N_ERRORS: usize = 10;
|
||||
for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
|
||||
tracing::warn!(
|
||||
"DeleteObjects key {} failed: {}: {}",
|
||||
e.key.as_ref().map(Cow::from).unwrap_or("".into()),
|
||||
e.code.as_ref().map(Cow::from).unwrap_or("".into()),
|
||||
e.message.as_ref().map(Cow::from).unwrap_or("".into())
|
||||
);
|
||||
}
|
||||
|
||||
return Err(anyhow::format_err!(
|
||||
"Failed to delete {} objects",
|
||||
errors.len()
|
||||
|
||||
@@ -153,7 +153,7 @@ impl FlushOp {
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct DeletionQueueClient {
|
||||
tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
|
||||
tx: tokio::sync::mpsc::Sender<ListWriterQueueMessage>,
|
||||
executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
|
||||
|
||||
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
|
||||
@@ -416,7 +416,7 @@ pub enum DeletionQueueError {
|
||||
impl DeletionQueueClient {
|
||||
pub(crate) fn broken() -> Self {
|
||||
// Channels whose receivers are immediately dropped.
|
||||
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (tx, _rx) = tokio::sync::mpsc::channel(1);
|
||||
let (executor_tx, _executor_rx) = tokio::sync::mpsc::channel(1);
|
||||
Self {
|
||||
tx,
|
||||
@@ -428,12 +428,12 @@ impl DeletionQueueClient {
|
||||
/// This is cancel-safe. If you drop the future before it completes, the message
|
||||
/// is not pushed, although in the context of the deletion queue it doesn't matter: once
|
||||
/// we decide to do a deletion the decision is always final.
|
||||
fn do_push<T>(
|
||||
async fn do_push<T>(
|
||||
&self,
|
||||
queue: &tokio::sync::mpsc::UnboundedSender<T>,
|
||||
queue: &tokio::sync::mpsc::Sender<T>,
|
||||
msg: T,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
match queue.send(msg) {
|
||||
match queue.send(msg).await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => {
|
||||
// This shouldn't happen, we should shut down all tenants before
|
||||
@@ -445,7 +445,7 @@ impl DeletionQueueClient {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn recover(
|
||||
pub(crate) async fn recover(
|
||||
&self,
|
||||
attached_tenants: HashMap<TenantId, Generation>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
@@ -453,6 +453,7 @@ impl DeletionQueueClient {
|
||||
&self.tx,
|
||||
ListWriterQueueMessage::Recover(RecoverOp { attached_tenants }),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// When a Timeline wishes to update the remote_consistent_lsn that it exposes to the outside
|
||||
@@ -525,21 +526,6 @@ impl DeletionQueueClient {
|
||||
return self.flush_immediate().await;
|
||||
}
|
||||
|
||||
self.push_layers_sync(tenant_id, timeline_id, current_generation, layers)
|
||||
}
|
||||
|
||||
/// When a Tenant has a generation, push_layers is always synchronous because
|
||||
/// the ListValidator channel is an unbounded channel.
|
||||
///
|
||||
/// This can be merged into push_layers when we remove the Generation-less mode
|
||||
/// support (`<https://github.com/neondatabase/neon/issues/5395>`)
|
||||
pub(crate) fn push_layers_sync(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
current_generation: Generation,
|
||||
layers: Vec<(LayerFileName, Generation)>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
metrics::DELETION_QUEUE
|
||||
.keys_submitted
|
||||
.inc_by(layers.len() as u64);
|
||||
@@ -553,16 +539,17 @@ impl DeletionQueueClient {
|
||||
objects: Vec::new(),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// This is cancel-safe. If you drop the future the flush may still happen in the background.
|
||||
async fn do_flush<T>(
|
||||
&self,
|
||||
queue: &tokio::sync::mpsc::UnboundedSender<T>,
|
||||
queue: &tokio::sync::mpsc::Sender<T>,
|
||||
msg: T,
|
||||
rx: tokio::sync::oneshot::Receiver<()>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
self.do_push(queue, msg)?;
|
||||
self.do_push(queue, msg).await?;
|
||||
if rx.await.is_err() {
|
||||
// This shouldn't happen if tenants are shut down before deletion queue. If we
|
||||
// encounter a bug like this, then a flusher will incorrectly believe it has flushed
|
||||
@@ -583,18 +570,6 @@ impl DeletionQueueClient {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Issue a flush without waiting for it to complete. This is useful on advisory flushes where
|
||||
/// the caller wants to avoid the risk of waiting for lots of enqueued work, such as on tenant
|
||||
/// detach where flushing is nice but not necessary.
|
||||
///
|
||||
/// This function provides no guarantees of work being done.
|
||||
pub fn flush_advisory(&self) {
|
||||
let (flush_op, _) = FlushOp::new();
|
||||
|
||||
// Transmit the flush message, ignoring any result (such as a closed channel during shutdown).
|
||||
drop(self.tx.send(ListWriterQueueMessage::FlushExecute(flush_op)));
|
||||
}
|
||||
|
||||
// Wait until all previous deletions are executed
|
||||
pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> {
|
||||
debug!("flush_execute: flushing to deletion lists...");
|
||||
@@ -611,7 +586,9 @@ impl DeletionQueueClient {
|
||||
// Flush any immediate-mode deletions (the above backend flush will only flush
|
||||
// the executor if deletions had flowed through the backend)
|
||||
debug!("flush_execute: flushing execution...");
|
||||
self.flush_immediate().await?;
|
||||
let (flush_op, rx) = FlushOp::new();
|
||||
self.do_flush(&self.executor_tx, DeleterMessage::Flush(flush_op), rx)
|
||||
.await?;
|
||||
debug!("flush_execute: finished flushing execution...");
|
||||
Ok(())
|
||||
}
|
||||
@@ -666,10 +643,8 @@ impl DeletionQueue {
|
||||
where
|
||||
C: ControlPlaneGenerationsApi + Send + Sync,
|
||||
{
|
||||
// Unbounded channel: enables non-async functions to submit deletions. The actual length is
|
||||
// constrained by how promptly the ListWriter wakes up and drains it, which should be frequent
|
||||
// enough to avoid this taking pathologically large amount of memory.
|
||||
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
// Deep channel: it consumes deletions from all timelines and we do not want to block them
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(16384);
|
||||
|
||||
// Shallow channel: it carries DeletionLists which each contain up to thousands of deletions
|
||||
let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16);
|
||||
@@ -982,7 +957,7 @@ mod test {
|
||||
// Basic test that the deletion queue processes the deletions we pass into it
|
||||
let ctx = setup("deletion_queue_smoke").expect("Failed test setup");
|
||||
let client = ctx.deletion_queue.new_client();
|
||||
client.recover(HashMap::new())?;
|
||||
client.recover(HashMap::new()).await?;
|
||||
|
||||
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
|
||||
let tenant_id = ctx.harness.tenant_id;
|
||||
@@ -1050,7 +1025,7 @@ mod test {
|
||||
async fn deletion_queue_validation() -> anyhow::Result<()> {
|
||||
let ctx = setup("deletion_queue_validation").expect("Failed test setup");
|
||||
let client = ctx.deletion_queue.new_client();
|
||||
client.recover(HashMap::new())?;
|
||||
client.recover(HashMap::new()).await?;
|
||||
|
||||
// Generation that the control plane thinks is current
|
||||
let latest_generation = Generation::new(0xdeadbeef);
|
||||
@@ -1107,7 +1082,7 @@ mod test {
|
||||
// Basic test that the deletion queue processes the deletions we pass into it
|
||||
let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup");
|
||||
let client = ctx.deletion_queue.new_client();
|
||||
client.recover(HashMap::new())?;
|
||||
client.recover(HashMap::new()).await?;
|
||||
|
||||
let tenant_id = ctx.harness.tenant_id;
|
||||
|
||||
@@ -1170,7 +1145,9 @@ mod test {
|
||||
drop(client);
|
||||
ctx.restart().await;
|
||||
let client = ctx.deletion_queue.new_client();
|
||||
client.recover(HashMap::from([(tenant_id, now_generation)]))?;
|
||||
client
|
||||
.recover(HashMap::from([(tenant_id, now_generation)]))
|
||||
.await?;
|
||||
|
||||
info!("Flush-executing");
|
||||
client.flush_execute().await?;
|
||||
@@ -1196,7 +1173,7 @@ pub(crate) mod mock {
|
||||
};
|
||||
|
||||
pub struct ConsumerState {
|
||||
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
|
||||
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
|
||||
executor_rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
|
||||
}
|
||||
|
||||
@@ -1273,7 +1250,7 @@ pub(crate) mod mock {
|
||||
}
|
||||
|
||||
pub struct MockDeletionQueue {
|
||||
tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
|
||||
tx: tokio::sync::mpsc::Sender<ListWriterQueueMessage>,
|
||||
executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
|
||||
executed: Arc<AtomicUsize>,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
@@ -1283,7 +1260,7 @@ pub(crate) mod mock {
|
||||
|
||||
impl MockDeletionQueue {
|
||||
pub fn new(remote_storage: Option<GenericRemoteStorage>) -> Self {
|
||||
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(16384);
|
||||
let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16384);
|
||||
|
||||
let executed = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
@@ -13,7 +13,6 @@ use std::time::Duration;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
use utils::backoff;
|
||||
|
||||
use crate::metrics;
|
||||
|
||||
@@ -64,19 +63,7 @@ impl Deleter {
|
||||
Err(anyhow::anyhow!("failpoint hit"))
|
||||
});
|
||||
|
||||
// A backoff::retry is used here for two reasons:
|
||||
// - To provide a backoff rather than busy-polling the API on errors
|
||||
// - To absorb transient 429/503 conditions without hitting our error
|
||||
// logging path for issues deleting objects.
|
||||
backoff::retry(
|
||||
|| async { self.remote_storage.delete_objects(&self.accumulator).await },
|
||||
|_| false,
|
||||
3,
|
||||
10,
|
||||
"executing deletion batch",
|
||||
backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Shutting down")),
|
||||
)
|
||||
.await
|
||||
self.remote_storage.delete_objects(&self.accumulator).await
|
||||
}
|
||||
|
||||
/// Block until everything in accumulator has been executed
|
||||
@@ -101,10 +88,7 @@ impl Deleter {
|
||||
self.accumulator.clear();
|
||||
}
|
||||
Err(e) => {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(DeletionQueueError::ShuttingDown);
|
||||
}
|
||||
warn!("DeleteObjects request failed: {e:#}, will continue trying");
|
||||
warn!("DeleteObjects request failed: {e:#}, will retry");
|
||||
metrics::DELETION_QUEUE
|
||||
.remote_errors
|
||||
.with_label_values(&["execute"])
|
||||
|
||||
@@ -85,7 +85,7 @@ pub(super) struct ListWriter {
|
||||
conf: &'static PageServerConf,
|
||||
|
||||
// Incoming frontend requests to delete some keys
|
||||
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
|
||||
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
|
||||
|
||||
// Outbound requests to the backend to execute deletion lists we have composed.
|
||||
tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
|
||||
@@ -111,7 +111,7 @@ impl ListWriter {
|
||||
|
||||
pub(super) fn new(
|
||||
conf: &'static PageServerConf,
|
||||
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
|
||||
rx: tokio::sync::mpsc::Receiver<ListWriterQueueMessage>,
|
||||
tx: tokio::sync::mpsc::Sender<ValidatorQueueMessage>,
|
||||
cancel: CancellationToken,
|
||||
) -> Self {
|
||||
|
||||
@@ -77,7 +77,7 @@ impl State {
|
||||
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
) -> anyhow::Result<Self> {
|
||||
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"]
|
||||
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"]
|
||||
.iter()
|
||||
.map(|v| v.parse().unwrap())
|
||||
.collect::<Vec<_>>();
|
||||
@@ -164,6 +164,9 @@ impl From<TenantStateError> for ApiError {
|
||||
fn from(tse: TenantStateError) -> ApiError {
|
||||
match tse {
|
||||
TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
|
||||
TenantStateError::NotActive(_) => {
|
||||
ApiError::ResourceUnavailable("Tenant not yet active".into())
|
||||
}
|
||||
TenantStateError::IsStopping(_) => {
|
||||
ApiError::ResourceUnavailable("Tenant is stopping".into())
|
||||
}
|
||||
@@ -572,14 +575,9 @@ async fn tenant_detach_handler(
|
||||
|
||||
let state = get_state(&request);
|
||||
let conf = state.conf;
|
||||
mgr::detach_tenant(
|
||||
conf,
|
||||
tenant_id,
|
||||
detach_ignored.unwrap_or(false),
|
||||
&state.deletion_queue_client,
|
||||
)
|
||||
.instrument(info_span!("tenant_detach", %tenant_id))
|
||||
.await?;
|
||||
mgr::detach_tenant(conf, tenant_id, detach_ignored.unwrap_or(false))
|
||||
.instrument(info_span!("tenant_detach", %tenant_id))
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
@@ -1036,7 +1034,7 @@ async fn put_tenant_location_config_handler(
|
||||
// The `Detached` state is special, it doesn't upsert a tenant, it removes
|
||||
// its local disk content and drops it from memory.
|
||||
if let LocationConfigMode::Detached = request_data.config.mode {
|
||||
mgr::detach_tenant(conf, tenant_id, true, &state.deletion_queue_client)
|
||||
mgr::detach_tenant(conf, tenant_id, true)
|
||||
.instrument(info_span!("tenant_detach", %tenant_id))
|
||||
.await?;
|
||||
return json_response(StatusCode::OK, ());
|
||||
|
||||
@@ -45,7 +45,6 @@ use std::sync::{Mutex, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use self::config::AttachedLocationConfig;
|
||||
use self::config::AttachmentMode;
|
||||
use self::config::LocationConf;
|
||||
use self::config::TenantConf;
|
||||
use self::delete::DeleteTenantFlow;
|
||||
@@ -209,7 +208,7 @@ pub struct Tenant {
|
||||
|
||||
/// The remote storage generation, used to protect S3 objects from split-brain.
|
||||
/// Does not change over the lifetime of the [`Tenant`] object.
|
||||
///
|
||||
///
|
||||
/// This duplicates the generation stored in LocationConf, but that structure is mutable:
|
||||
/// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
|
||||
generation: Generation,
|
||||
@@ -2077,15 +2076,6 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_attach_mode(&self) -> AttachmentMode {
|
||||
self.tenant_conf
|
||||
.read()
|
||||
.unwrap()
|
||||
.location
|
||||
.attach_mode
|
||||
.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
|
||||
@@ -2755,11 +2745,6 @@ impl Tenant {
|
||||
) -> Result<Arc<Timeline>, CreateTimelineError> {
|
||||
let src_id = src_timeline.timeline_id;
|
||||
|
||||
// First acquire the GC lock so that another task cannot advance the GC
|
||||
// cutoff in 'gc_info', and make 'start_lsn' invalid, while we are
|
||||
// creating the branch.
|
||||
let _gc_cs = self.gc_cs.lock().await;
|
||||
|
||||
// If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN
|
||||
let start_lsn = start_lsn.unwrap_or_else(|| {
|
||||
let lsn = src_timeline.get_last_record_lsn();
|
||||
@@ -2767,6 +2752,11 @@ impl Tenant {
|
||||
lsn
|
||||
});
|
||||
|
||||
// First acquire the GC lock so that another task cannot advance the GC
|
||||
// cutoff in 'gc_info', and make 'start_lsn' invalid, while we are
|
||||
// creating the branch.
|
||||
let _gc_cs = self.gc_cs.lock().await;
|
||||
|
||||
// Create a placeholder for the new branch. This will error
|
||||
// out if the new timeline ID is already in use.
|
||||
let timeline_uninit_mark = {
|
||||
|
||||
@@ -31,7 +31,7 @@ use super::{
|
||||
const SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS: u32 = 3;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum DeleteTenantError {
|
||||
pub enum DeleteTenantError {
|
||||
#[error("GetTenant {0}")]
|
||||
Get(#[from] GetTenantError),
|
||||
|
||||
@@ -376,7 +376,7 @@ impl DeleteTenantFlow {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn should_resume_deletion(
|
||||
pub async fn should_resume_deletion(
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<&GenericRemoteStorage>,
|
||||
tenant: &Tenant,
|
||||
|
||||
@@ -24,7 +24,7 @@ use crate::control_plane_client::{
|
||||
};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::task_mgr::{self, TaskKind};
|
||||
use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt};
|
||||
use crate::tenant::config::{LocationConf, LocationMode, TenantConfOpt};
|
||||
use crate::tenant::delete::DeleteTenantFlow;
|
||||
use crate::tenant::{
|
||||
create_tenant_files, AttachedTenantConf, CreateTenantFilesMode, Tenant, TenantState,
|
||||
@@ -50,7 +50,7 @@ use super::TenantSharedResources;
|
||||
/// its lifetime, and we can preserve some important safety invariants like `Tenant` always
|
||||
/// having a properly acquired generation (Secondary doesn't need a generation)
|
||||
#[derive(Clone)]
|
||||
pub(crate) enum TenantSlot {
|
||||
pub enum TenantSlot {
|
||||
Attached(Arc<Tenant>),
|
||||
Secondary,
|
||||
}
|
||||
@@ -206,7 +206,8 @@ async fn init_load_generations(
|
||||
if resources.remote_storage.is_some() {
|
||||
resources
|
||||
.deletion_queue_client
|
||||
.recover(generations.clone())?;
|
||||
.recover(generations.clone())
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(Some(generations))
|
||||
@@ -481,7 +482,7 @@ pub(crate) fn schedule_local_tenant_processing(
|
||||
/// management API. For example, it could attach the tenant on a different pageserver.
|
||||
/// We would then be in split-brain once this pageserver restarts.
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn shutdown_all_tenants() {
|
||||
pub async fn shutdown_all_tenants() {
|
||||
shutdown_all_tenants0(&TENANTS).await
|
||||
}
|
||||
|
||||
@@ -593,7 +594,7 @@ async fn shutdown_all_tenants0(tenants: &tokio::sync::RwLock<TenantsMap>) {
|
||||
// caller will log how long we took
|
||||
}
|
||||
|
||||
pub(crate) async fn create_tenant(
|
||||
pub async fn create_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: TenantConfOpt,
|
||||
tenant_id: TenantId,
|
||||
@@ -628,14 +629,14 @@ pub(crate) async fn create_tenant(
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum SetNewTenantConfigError {
|
||||
pub enum SetNewTenantConfigError {
|
||||
#[error(transparent)]
|
||||
GetTenant(#[from] GetTenantError),
|
||||
#[error(transparent)]
|
||||
Persist(anyhow::Error),
|
||||
}
|
||||
|
||||
pub(crate) async fn set_new_tenant_config(
|
||||
pub async fn set_new_tenant_config(
|
||||
conf: &'static PageServerConf,
|
||||
new_tenant_conf: TenantConfOpt,
|
||||
tenant_id: TenantId,
|
||||
@@ -694,18 +695,6 @@ pub(crate) async fn upsert_location(
|
||||
|
||||
if let Some(tenant) = shutdown_tenant {
|
||||
let (_guard, progress) = utils::completion::channel();
|
||||
|
||||
match tenant.get_attach_mode() {
|
||||
AttachmentMode::Single | AttachmentMode::Multi => {
|
||||
// Before we leave our state as the presumed holder of the latest generation,
|
||||
// flush any outstanding deletions to reduce the risk of leaking objects.
|
||||
deletion_queue_client.flush_advisory()
|
||||
}
|
||||
AttachmentMode::Stale => {
|
||||
// If we're stale there's not point trying to flush deletions
|
||||
}
|
||||
};
|
||||
|
||||
info!("Shutting down attached tenant");
|
||||
match tenant.shutdown(progress, false).await {
|
||||
Ok(()) => {}
|
||||
@@ -776,7 +765,7 @@ pub(crate) async fn upsert_location(
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum GetTenantError {
|
||||
pub enum GetTenantError {
|
||||
#[error("Tenant {0} not found")]
|
||||
NotFound(TenantId),
|
||||
#[error("Tenant {0} is not active")]
|
||||
@@ -792,7 +781,7 @@ pub(crate) enum GetTenantError {
|
||||
/// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
|
||||
///
|
||||
/// This method is cancel-safe.
|
||||
pub(crate) async fn get_tenant(
|
||||
pub async fn get_tenant(
|
||||
tenant_id: TenantId,
|
||||
active_only: bool,
|
||||
) -> Result<Arc<Tenant>, GetTenantError> {
|
||||
@@ -817,7 +806,7 @@ pub(crate) async fn get_tenant(
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_tenant(
|
||||
pub async fn delete_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
tenant_id: TenantId,
|
||||
@@ -826,7 +815,7 @@ pub(crate) async fn delete_tenant(
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum DeleteTimelineError {
|
||||
pub enum DeleteTimelineError {
|
||||
#[error("Tenant {0}")]
|
||||
Tenant(#[from] GetTenantError),
|
||||
|
||||
@@ -834,7 +823,7 @@ pub(crate) enum DeleteTimelineError {
|
||||
Timeline(#[from] crate::tenant::DeleteTimelineError),
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_timeline(
|
||||
pub async fn delete_timeline(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
_ctx: &RequestContext,
|
||||
@@ -845,29 +834,23 @@ pub(crate) async fn delete_timeline(
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum TenantStateError {
|
||||
pub enum TenantStateError {
|
||||
#[error("Tenant {0} not found")]
|
||||
NotFound(TenantId),
|
||||
#[error("Tenant {0} is stopping")]
|
||||
IsStopping(TenantId),
|
||||
#[error("Tenant {0} is not active")]
|
||||
NotActive(TenantId),
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
pub(crate) async fn detach_tenant(
|
||||
pub async fn detach_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
detach_ignored: bool,
|
||||
deletion_queue_client: &DeletionQueueClient,
|
||||
) -> Result<(), TenantStateError> {
|
||||
let tmp_path = detach_tenant0(
|
||||
conf,
|
||||
&TENANTS,
|
||||
tenant_id,
|
||||
detach_ignored,
|
||||
deletion_queue_client,
|
||||
)
|
||||
.await?;
|
||||
let tmp_path = detach_tenant0(conf, &TENANTS, tenant_id, detach_ignored).await?;
|
||||
// Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
|
||||
// After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
|
||||
let task_tenant_id = None;
|
||||
@@ -892,7 +875,6 @@ async fn detach_tenant0(
|
||||
tenants: &tokio::sync::RwLock<TenantsMap>,
|
||||
tenant_id: TenantId,
|
||||
detach_ignored: bool,
|
||||
deletion_queue_client: &DeletionQueueClient,
|
||||
) -> Result<Utf8PathBuf, TenantStateError> {
|
||||
let tenant_dir_rename_operation = |tenant_id_to_clean| async move {
|
||||
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
|
||||
@@ -904,10 +886,6 @@ async fn detach_tenant0(
|
||||
let removal_result =
|
||||
remove_tenant_from_memory(tenants, tenant_id, tenant_dir_rename_operation(tenant_id)).await;
|
||||
|
||||
// Flush pending deletions, so that they have a good chance of passing validation
|
||||
// before this tenant is potentially re-attached elsewhere.
|
||||
deletion_queue_client.flush_advisory();
|
||||
|
||||
// Ignored tenants are not present in memory and will bail the removal from memory operation.
|
||||
// Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
|
||||
if detach_ignored && matches!(removal_result, Err(TenantStateError::NotFound(_))) {
|
||||
@@ -924,7 +902,7 @@ async fn detach_tenant0(
|
||||
removal_result
|
||||
}
|
||||
|
||||
pub(crate) async fn load_tenant(
|
||||
pub async fn load_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
generation: Generation,
|
||||
@@ -961,7 +939,7 @@ pub(crate) async fn load_tenant(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn ignore_tenant(
|
||||
pub async fn ignore_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<(), TenantStateError> {
|
||||
@@ -989,7 +967,7 @@ async fn ignore_tenant0(
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum TenantMapListError {
|
||||
pub enum TenantMapListError {
|
||||
#[error("tenant map is still initiailizing")]
|
||||
Initializing,
|
||||
}
|
||||
@@ -997,7 +975,7 @@ pub(crate) enum TenantMapListError {
|
||||
///
|
||||
/// Get list of tenants, for the mgmt API
|
||||
///
|
||||
pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
|
||||
pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
|
||||
let tenants = TENANTS.read().await;
|
||||
let m = match &*tenants {
|
||||
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
|
||||
@@ -1015,7 +993,7 @@ pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, Tenan
|
||||
///
|
||||
/// Downloading all the tenant data is performed in the background, this merely
|
||||
/// spawns the background task and returns quickly.
|
||||
pub(crate) async fn attach_tenant(
|
||||
pub async fn attach_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
generation: Generation,
|
||||
@@ -1052,7 +1030,7 @@ pub(crate) async fn attach_tenant(
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum TenantMapInsertError {
|
||||
pub enum TenantMapInsertError {
|
||||
#[error("tenant map is still initializing")]
|
||||
StillInitializing,
|
||||
#[error("tenant map is shutting down")]
|
||||
@@ -1215,7 +1193,7 @@ use {
|
||||
utils::http::error::ApiError,
|
||||
};
|
||||
|
||||
pub(crate) async fn immediate_gc(
|
||||
pub async fn immediate_gc(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
gc_req: TimelineGcRequest,
|
||||
|
||||
@@ -30,7 +30,6 @@ use std::os::unix::io::{AsRawFd, RawFd};
|
||||
use std::os::unix::prelude::CommandExt;
|
||||
use std::process::Stdio;
|
||||
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Mutex, MutexGuard};
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
@@ -40,7 +39,7 @@ use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn, nonblock::set_nonblock};
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use crate::metrics::{
|
||||
WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME,
|
||||
@@ -94,7 +93,6 @@ pub trait WalRedoManager: Send + Sync {
|
||||
}
|
||||
|
||||
struct ProcessInput {
|
||||
restart_no: u64,
|
||||
child: NoLeakChild,
|
||||
stdin: ChildStdin,
|
||||
stderr_fd: RawFd,
|
||||
@@ -103,7 +101,6 @@ struct ProcessInput {
|
||||
}
|
||||
|
||||
struct ProcessOutput {
|
||||
restart_no: u64,
|
||||
stdout: ChildStdout,
|
||||
pending_responses: VecDeque<Option<Bytes>>,
|
||||
n_processed_responses: usize,
|
||||
@@ -123,7 +120,6 @@ pub struct PostgresRedoManager {
|
||||
#[cfg(feature = "testing")]
|
||||
dump_sequence: AtomicUsize,
|
||||
|
||||
restart_counter: AtomicU64,
|
||||
stdout: Mutex<Option<ProcessOutput>>,
|
||||
stdin: Mutex<Option<ProcessInput>>,
|
||||
stderr: Mutex<Option<ChildStderr>>,
|
||||
@@ -232,7 +228,6 @@ impl PostgresRedoManager {
|
||||
pub fn new(conf: &'static PageServerConf, tenant_id: TenantId) -> PostgresRedoManager {
|
||||
// The actual process is launched lazily, on first request.
|
||||
PostgresRedoManager {
|
||||
restart_counter: AtomicU64::new(0),
|
||||
tenant_id,
|
||||
conf,
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -278,7 +273,6 @@ impl PostgresRedoManager {
|
||||
if proc.is_none() {
|
||||
self.launch(&mut proc, pg_version)?;
|
||||
}
|
||||
let restart_no = proc.as_ref().unwrap().restart_no;
|
||||
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
|
||||
|
||||
// Relational WAL records are applied using wal-redo-postgres
|
||||
@@ -328,12 +322,18 @@ impl PostgresRedoManager {
|
||||
// self.stdin only holds stdin & stderr as_raw_fd().
|
||||
// Dropping it as part of take() doesn't close them.
|
||||
// The owning objects (ChildStdout and ChildStderr) are stored in
|
||||
// self.stdout and self.stderr, respectively.
|
||||
// They will be closed when the new process is launched.
|
||||
// self.stdout and self.stderr, respsectively.
|
||||
// We intentionally keep them open here to avoid a race between
|
||||
// currently running `apply_wal_records()` and a `launch()` call
|
||||
// after we return here.
|
||||
// The currently running `apply_wal_records()` must not read from
|
||||
// the newly launched process.
|
||||
// By keeping self.stdout and self.stderr open here, `launch()` will
|
||||
// get other file descriptors for the new child's stdout and stderr,
|
||||
// and hence the current `apply_wal_records()` calls will observe
|
||||
// `output.stdout.as_raw_fd() != stdout_fd` .
|
||||
if let Some(proc) = self.stdin.lock().unwrap().take() {
|
||||
if proc.restart_no == restart_no {
|
||||
proc.child.kill_and_wait();
|
||||
}
|
||||
proc.child.kill_and_wait();
|
||||
}
|
||||
} else if n_attempts != 0 {
|
||||
info!(n_attempts, "retried walredo succeeded");
|
||||
@@ -730,9 +730,7 @@ impl PostgresRedoManager {
|
||||
// all fallible operations post-spawn are complete, so get rid of the guard
|
||||
let child = scopeguard::ScopeGuard::into_inner(child);
|
||||
|
||||
let restart_no = self.restart_counter.fetch_add(1, Ordering::SeqCst);
|
||||
**input = Some(ProcessInput {
|
||||
restart_no,
|
||||
child,
|
||||
stdout_fd: stdout.as_raw_fd(),
|
||||
stderr_fd: stderr.as_raw_fd(),
|
||||
@@ -741,7 +739,6 @@ impl PostgresRedoManager {
|
||||
});
|
||||
|
||||
*self.stdout.lock().unwrap() = Some(ProcessOutput {
|
||||
restart_no,
|
||||
stdout,
|
||||
pending_responses: VecDeque::new(),
|
||||
n_processed_responses: 0,
|
||||
@@ -813,13 +810,13 @@ impl PostgresRedoManager {
|
||||
) -> Result<Bytes, std::io::Error> {
|
||||
let proc = input.as_mut().unwrap();
|
||||
let mut nwrite = 0usize;
|
||||
let restart_no = proc.restart_no;
|
||||
let stdout_fd = proc.stdout_fd;
|
||||
|
||||
// Prepare for calling poll()
|
||||
let mut pollfds = [
|
||||
PollFd::new(proc.stdin.as_raw_fd(), PollFlags::POLLOUT),
|
||||
PollFd::new(proc.stderr_fd, PollFlags::POLLIN),
|
||||
PollFd::new(proc.stdout_fd, PollFlags::POLLIN),
|
||||
PollFd::new(stdout_fd, PollFlags::POLLIN),
|
||||
];
|
||||
|
||||
// We do two things simultaneously: send the old base image and WAL records to
|
||||
@@ -894,10 +891,13 @@ impl PostgresRedoManager {
|
||||
|
||||
let mut output_guard = self.stdout.lock().unwrap();
|
||||
let output = output_guard.as_mut().unwrap();
|
||||
if output.restart_no != restart_no {
|
||||
// If restart_no changed, the walredo process crashed and was restarted
|
||||
// between dropping the 'input' lock and acquiring 'output'. In that case,
|
||||
// 'output' belongs to different process than where we sent the request.
|
||||
if output.stdout.as_raw_fd() != stdout_fd {
|
||||
// If stdout file descriptor is changed then it means that walredo process is crashed and restarted.
|
||||
// As far as ProcessInput and ProcessOutout are protected by different mutexes,
|
||||
// it can happen that we send request to one process and waiting response from another.
|
||||
// To prevent such situation we compare stdout file descriptors.
|
||||
// As far as old stdout pipe is destroyed only after new one is created,
|
||||
// it can not reuse the same file descriptor, so this check is safe.
|
||||
//
|
||||
// Cross-read this with the comment in apply_batch_postgres if result.is_err().
|
||||
// That's where we kill the child process.
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use futures::future::Either;
|
||||
use proxy::auth;
|
||||
use proxy::config::HttpConfig;
|
||||
use proxy::console;
|
||||
use proxy::http;
|
||||
use proxy::metrics;
|
||||
@@ -80,9 +79,6 @@ struct ProxyCliArgs {
|
||||
/// Allow self-signed certificates for compute nodes (for testing)
|
||||
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
|
||||
allow_self_signed_compute: bool,
|
||||
/// timeout for http connections
|
||||
#[clap(long, default_value = "15s", value_parser = humantime::parse_duration)]
|
||||
sql_over_http_timeout: tokio::time::Duration,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
@@ -224,15 +220,12 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
auth::BackendType::Link(Cow::Owned(url))
|
||||
}
|
||||
};
|
||||
let http_config = HttpConfig {
|
||||
sql_over_http_timeout: args.sql_over_http_timeout,
|
||||
};
|
||||
|
||||
let config = Box::leak(Box::new(ProxyConfig {
|
||||
tls_config,
|
||||
auth_backend,
|
||||
metric_collection,
|
||||
allow_self_signed_compute: args.allow_self_signed_compute,
|
||||
http_config,
|
||||
}));
|
||||
|
||||
Ok(config)
|
||||
|
||||
@@ -13,7 +13,6 @@ pub struct ProxyConfig {
|
||||
pub auth_backend: auth::BackendType<'static, ()>,
|
||||
pub metric_collection: Option<MetricCollectionConfig>,
|
||||
pub allow_self_signed_compute: bool,
|
||||
pub http_config: HttpConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -27,10 +26,6 @@ pub struct TlsConfig {
|
||||
pub common_names: Option<HashSet<String>>,
|
||||
}
|
||||
|
||||
pub struct HttpConfig {
|
||||
pub sql_over_http_timeout: tokio::time::Duration,
|
||||
}
|
||||
|
||||
impl TlsConfig {
|
||||
pub fn to_server_config(&self) -> Arc<rustls::ServerConfig> {
|
||||
self.config.clone()
|
||||
|
||||
@@ -20,7 +20,6 @@ use tokio_postgres::AsyncMessage;
|
||||
use crate::{
|
||||
auth, console,
|
||||
metrics::{Ids, MetricCounter, USAGE_METRICS},
|
||||
proxy::{NUM_DB_CONNECTIONS_CLOSED_COUNTER, NUM_DB_CONNECTIONS_OPENED_COUNTER},
|
||||
};
|
||||
use crate::{compute, config};
|
||||
|
||||
@@ -419,42 +418,36 @@ async fn connect_to_compute_once(
|
||||
};
|
||||
|
||||
tokio::spawn(
|
||||
async move {
|
||||
NUM_DB_CONNECTIONS_OPENED_COUNTER.with_label_values(&["http"]).inc();
|
||||
scopeguard::defer! {
|
||||
NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc();
|
||||
poll_fn(move |cx| {
|
||||
if matches!(rx.has_changed(), Ok(true)) {
|
||||
session = *rx.borrow_and_update();
|
||||
info!(%session, "changed session");
|
||||
}
|
||||
poll_fn(move |cx| {
|
||||
if matches!(rx.has_changed(), Ok(true)) {
|
||||
session = *rx.borrow_and_update();
|
||||
info!(%session, "changed session");
|
||||
}
|
||||
|
||||
loop {
|
||||
let message = ready!(connection.poll_message(cx));
|
||||
loop {
|
||||
let message = ready!(connection.poll_message(cx));
|
||||
|
||||
match message {
|
||||
Some(Ok(AsyncMessage::Notice(notice))) => {
|
||||
info!(%session, "notice: {}", notice);
|
||||
}
|
||||
Some(Ok(AsyncMessage::Notification(notif))) => {
|
||||
warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received");
|
||||
}
|
||||
Some(Ok(_)) => {
|
||||
warn!(%session, "unknown message");
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
error!(%session, "connection error: {}", e);
|
||||
return Poll::Ready(())
|
||||
}
|
||||
None => {
|
||||
info!("connection closed");
|
||||
return Poll::Ready(())
|
||||
}
|
||||
match message {
|
||||
Some(Ok(AsyncMessage::Notice(notice))) => {
|
||||
info!(%session, "notice: {}", notice);
|
||||
}
|
||||
Some(Ok(AsyncMessage::Notification(notif))) => {
|
||||
warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received");
|
||||
}
|
||||
Some(Ok(_)) => {
|
||||
warn!(%session, "unknown message");
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
error!(%session, "connection error: {}", e);
|
||||
return Poll::Ready(())
|
||||
}
|
||||
None => {
|
||||
info!("connection closed");
|
||||
return Poll::Ready(())
|
||||
}
|
||||
}
|
||||
}).await
|
||||
}
|
||||
}
|
||||
})
|
||||
.instrument(span)
|
||||
);
|
||||
|
||||
|
||||
@@ -24,9 +24,6 @@ use url::Url;
|
||||
use utils::http::error::ApiError;
|
||||
use utils::http::json::json_response;
|
||||
|
||||
use crate::config::HttpConfig;
|
||||
use crate::proxy::{NUM_CONNECTIONS_ACCEPTED_COUNTER, NUM_CONNECTIONS_CLOSED_COUNTER};
|
||||
|
||||
use super::conn_pool::ConnInfo;
|
||||
use super::conn_pool::GlobalConnPool;
|
||||
|
||||
@@ -102,9 +99,9 @@ fn json_array_to_pg_array(value: &Value) -> Result<Option<String>, serde_json::E
|
||||
// convert to text with escaping
|
||||
Value::Bool(_) => serde_json::to_string(value).map(Some),
|
||||
Value::Number(_) => serde_json::to_string(value).map(Some),
|
||||
Value::Object(_) => serde_json::to_string(value).map(Some),
|
||||
|
||||
// here string needs to be escaped, as it is part of the array
|
||||
Value::Object(_) => json_array_to_pg_array(&Value::String(serde_json::to_string(value)?)),
|
||||
Value::String(_) => serde_json::to_string(value).map(Some),
|
||||
|
||||
// recurse into array
|
||||
@@ -191,46 +188,28 @@ pub async fn handle(
|
||||
sni_hostname: Option<String>,
|
||||
conn_pool: Arc<GlobalConnPool>,
|
||||
session_id: uuid::Uuid,
|
||||
config: &'static HttpConfig,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let result = tokio::time::timeout(
|
||||
config.sql_over_http_timeout,
|
||||
handle_inner(request, sni_hostname, conn_pool, session_id),
|
||||
)
|
||||
.await;
|
||||
let result = handle_inner(request, sni_hostname, conn_pool, session_id).await;
|
||||
|
||||
let mut response = match result {
|
||||
Ok(r) => match r {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
let message = format!("{:?}", e);
|
||||
let code = e.downcast_ref::<tokio_postgres::Error>().and_then(|e| {
|
||||
e.code()
|
||||
.map(|s| serde_json::to_value(s.code()).unwrap_or_default())
|
||||
});
|
||||
let code = match code {
|
||||
Some(c) => c,
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
let message = format!("{:?}", e);
|
||||
let code = match e.downcast_ref::<tokio_postgres::Error>() {
|
||||
Some(e) => match e.code() {
|
||||
Some(e) => serde_json::to_value(e.code()).unwrap(),
|
||||
None => Value::Null,
|
||||
};
|
||||
error!(
|
||||
?code,
|
||||
"sql-over-http per-client task finished with an error: {e:#}"
|
||||
);
|
||||
// TODO: this shouldn't always be bad request.
|
||||
json_response(
|
||||
StatusCode::BAD_REQUEST,
|
||||
json!({ "message": message, "code": code }),
|
||||
)?
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
let message = format!(
|
||||
"HTTP-Connection timed out, execution time exeeded {} seconds",
|
||||
config.sql_over_http_timeout.as_secs()
|
||||
},
|
||||
None => Value::Null,
|
||||
};
|
||||
error!(
|
||||
?code,
|
||||
"sql-over-http per-client task finished with an error: {e:#}"
|
||||
);
|
||||
error!(message);
|
||||
// TODO: this shouldn't always be bad request.
|
||||
json_response(
|
||||
StatusCode::GATEWAY_TIMEOUT,
|
||||
json!({ "message": message, "code": StatusCode::GATEWAY_TIMEOUT.as_u16() }),
|
||||
StatusCode::BAD_REQUEST,
|
||||
json!({ "message": message, "code": code }),
|
||||
)?
|
||||
}
|
||||
};
|
||||
@@ -248,13 +227,6 @@ async fn handle_inner(
|
||||
conn_pool: Arc<GlobalConnPool>,
|
||||
session_id: uuid::Uuid,
|
||||
) -> anyhow::Result<Response<Body>> {
|
||||
NUM_CONNECTIONS_ACCEPTED_COUNTER
|
||||
.with_label_values(&["http"])
|
||||
.inc();
|
||||
scopeguard::defer! {
|
||||
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc();
|
||||
}
|
||||
|
||||
//
|
||||
// Determine the destination and connection params
|
||||
//
|
||||
@@ -613,7 +585,7 @@ fn _pg_array_parse(
|
||||
}
|
||||
}
|
||||
}
|
||||
'}' if !quote => {
|
||||
'}' => {
|
||||
level -= 1;
|
||||
if level == 0 {
|
||||
push_checked(&mut entry, &mut entries, elem_type)?;
|
||||
@@ -697,14 +669,6 @@ mod tests {
|
||||
"{{true,false},{NULL,42},{\"foo\",\"bar\\\"-\\\\\"}}".to_owned()
|
||||
)]
|
||||
);
|
||||
// array of objects
|
||||
let json = r#"[{"foo": 1},{"bar": 2}]"#;
|
||||
let json: Value = serde_json::from_str(json).unwrap();
|
||||
let pg_params = json_to_pg_text(vec![json]).unwrap();
|
||||
assert_eq!(
|
||||
pg_params,
|
||||
vec![Some(r#"{"{\"foo\":1}","{\"bar\":2}"}"#.to_owned())]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -832,23 +796,4 @@ mod tests {
|
||||
json!([[[1, 2, 3], [4, 5, 6]]])
|
||||
);
|
||||
}
|
||||
#[test]
|
||||
fn test_pg_array_parse_json() {
|
||||
fn pt(pg_arr: &str) -> Value {
|
||||
pg_array_parse(pg_arr, &Type::JSONB).unwrap()
|
||||
}
|
||||
assert_eq!(pt(r#"{"{}"}"#), json!([{}]));
|
||||
assert_eq!(
|
||||
pt(r#"{"{\"foo\": 1, \"bar\": 2}"}"#),
|
||||
json!([{"foo": 1, "bar": 2}])
|
||||
);
|
||||
assert_eq!(
|
||||
pt(r#"{"{\"foo\": 1}", "{\"bar\": 2}"}"#),
|
||||
json!([{"foo": 1}, {"bar": 2}])
|
||||
);
|
||||
assert_eq!(
|
||||
pt(r#"{{"{\"foo\": 1}", "{\"bar\": 2}"}}"#),
|
||||
json!([[{"foo": 1}, {"bar": 2}]])
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,10 +3,7 @@ use crate::{
|
||||
config::ProxyConfig,
|
||||
error::io_error,
|
||||
protocol2::{ProxyProtocolAccept, WithClientIp},
|
||||
proxy::{
|
||||
handle_client, ClientMode, NUM_CLIENT_CONNECTION_CLOSED_COUNTER,
|
||||
NUM_CLIENT_CONNECTION_OPENED_COUNTER,
|
||||
},
|
||||
proxy::{handle_client, ClientMode},
|
||||
};
|
||||
use bytes::{Buf, Bytes};
|
||||
use futures::{Sink, Stream, StreamExt};
|
||||
@@ -205,14 +202,7 @@ async fn ws_handler(
|
||||
// TODO: that deserves a refactor as now this function also handles http json client besides websockets.
|
||||
// Right now I don't want to blow up sql-over-http patch with file renames and do that as a follow up instead.
|
||||
} else if request.uri().path() == "/sql" && request.method() == Method::POST {
|
||||
sql_over_http::handle(
|
||||
request,
|
||||
sni_hostname,
|
||||
conn_pool,
|
||||
session_id,
|
||||
&config.http_config,
|
||||
)
|
||||
.await
|
||||
sql_over_http::handle(request, sni_hostname, conn_pool, session_id).await
|
||||
} else if request.uri().path() == "/sql" && request.method() == Method::OPTIONS {
|
||||
Response::builder()
|
||||
.header("Allow", "OPTIONS, POST")
|
||||
@@ -285,25 +275,23 @@ pub async fn task_main(
|
||||
let conn_pool = conn_pool.clone();
|
||||
|
||||
async move {
|
||||
Ok::<_, Infallible>(MetricService::new(hyper::service::service_fn(
|
||||
move |req: Request<Body>| {
|
||||
let sni_name = sni_name.clone();
|
||||
let conn_pool = conn_pool.clone();
|
||||
Ok::<_, Infallible>(hyper::service::service_fn(move |req: Request<Body>| {
|
||||
let sni_name = sni_name.clone();
|
||||
let conn_pool = conn_pool.clone();
|
||||
|
||||
async move {
|
||||
let cancel_map = Arc::new(CancelMap::default());
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
async move {
|
||||
let cancel_map = Arc::new(CancelMap::default());
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
|
||||
ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name)
|
||||
.instrument(info_span!(
|
||||
"ws-client",
|
||||
session = %session_id,
|
||||
%peer_addr,
|
||||
))
|
||||
.await
|
||||
}
|
||||
},
|
||||
)))
|
||||
ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name)
|
||||
.instrument(info_span!(
|
||||
"ws-client",
|
||||
session = %session_id,
|
||||
%peer_addr,
|
||||
))
|
||||
.await
|
||||
}
|
||||
}))
|
||||
}
|
||||
},
|
||||
);
|
||||
@@ -315,41 +303,3 @@ pub async fn task_main(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct MetricService<S> {
|
||||
inner: S,
|
||||
}
|
||||
|
||||
impl<S> MetricService<S> {
|
||||
fn new(inner: S) -> MetricService<S> {
|
||||
NUM_CLIENT_CONNECTION_OPENED_COUNTER
|
||||
.with_label_values(&["http"])
|
||||
.inc();
|
||||
MetricService { inner }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Drop for MetricService<S> {
|
||||
fn drop(&mut self) {
|
||||
NUM_CLIENT_CONNECTION_CLOSED_COUNTER
|
||||
.with_label_values(&["http"])
|
||||
.inc();
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, ReqBody> hyper::service::Service<Request<ReqBody>> for MetricService<S>
|
||||
where
|
||||
S: hyper::service::Service<Request<ReqBody>>,
|
||||
{
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type Future = S::Future;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.inner.poll_ready(cx)
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
|
||||
self.inner.call(req)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,6 @@ use crate::{
|
||||
compute::{self, PostgresConnection},
|
||||
config::{ProxyConfig, TlsConfig},
|
||||
console::{self, errors::WakeComputeError, messages::MetricsAuxInfo, Api},
|
||||
http::StatusCode,
|
||||
metrics::{Ids, USAGE_METRICS},
|
||||
protocol2::WithClientIp,
|
||||
stream::{PqStream, Stream},
|
||||
@@ -39,55 +38,19 @@ const RETRY_WAIT_EXPONENT_BASE: f64 = std::f64::consts::SQRT_2;
|
||||
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
|
||||
const ERR_PROTO_VIOLATION: &str = "protocol violation";
|
||||
|
||||
pub static NUM_DB_CONNECTIONS_OPENED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_opened_db_connections_total",
|
||||
"Number of opened connections to a database.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_DB_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_closed_db_connections_total",
|
||||
"Number of closed connections to a database.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_CLIENT_CONNECTION_OPENED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_opened_client_connections_total",
|
||||
"Number of opened connections from a client.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_CLIENT_CONNECTION_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_closed_client_connections_total",
|
||||
"Number of closed connections from a client.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_accepted_connections_total",
|
||||
"Number of client connections accepted.",
|
||||
"Number of TCP client connections accepted.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_closed_connections_total",
|
||||
"Number of client connections closed.",
|
||||
"Number of TCP client connections closed.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
@@ -112,15 +75,6 @@ static NUM_CONNECTION_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
static NUM_WAKEUP_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_connection_failures_breakdown",
|
||||
"Number of wake-up failures (per kind).",
|
||||
&["retry", "kind"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
static NUM_BYTES_PROXIED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_io_bytes_per_client",
|
||||
@@ -254,16 +208,12 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
"handling interactive connection from client"
|
||||
);
|
||||
|
||||
let proto = mode.protocol_label();
|
||||
NUM_CLIENT_CONNECTION_OPENED_COUNTER
|
||||
.with_label_values(&[proto])
|
||||
.inc();
|
||||
// The `closed` counter will increase when this future is destroyed.
|
||||
NUM_CONNECTIONS_ACCEPTED_COUNTER
|
||||
.with_label_values(&[proto])
|
||||
.with_label_values(&[mode.protocol_label()])
|
||||
.inc();
|
||||
scopeguard::defer! {
|
||||
NUM_CLIENT_CONNECTION_CLOSED_COUNTER.with_label_values(&[proto]).inc();
|
||||
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc();
|
||||
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[mode.protocol_label()]).inc();
|
||||
}
|
||||
|
||||
let tls = config.tls_config.as_ref();
|
||||
@@ -298,7 +248,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
mode.allow_self_signed_compute(config),
|
||||
);
|
||||
cancel_map
|
||||
.with_session(|session| client.connect_to_db(session, mode))
|
||||
.with_session(|session| client.connect_to_db(session, mode.allow_cleartext()))
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -447,46 +397,6 @@ impl ConnectMechanism for TcpMechanism<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
const fn bool_to_str(x: bool) -> &'static str {
|
||||
if x {
|
||||
"true"
|
||||
} else {
|
||||
"false"
|
||||
}
|
||||
}
|
||||
|
||||
fn report_error(e: &WakeComputeError, retry: bool) {
|
||||
use crate::console::errors::ApiError;
|
||||
let retry = bool_to_str(retry);
|
||||
let kind = match e {
|
||||
WakeComputeError::BadComputeAddress(_) => "bad_compute_address",
|
||||
WakeComputeError::ApiError(ApiError::Transport(_)) => "api_transport_error",
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::LOCKED,
|
||||
ref text,
|
||||
}) if text.contains("written data quota exceeded")
|
||||
|| text.contains("the limit for current plan reached") =>
|
||||
{
|
||||
"quota_exceeded"
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::LOCKED,
|
||||
..
|
||||
}) => "api_console_locked",
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::BAD_REQUEST,
|
||||
..
|
||||
}) => "api_console_bad_request",
|
||||
WakeComputeError::ApiError(ApiError::Console { status, .. })
|
||||
if status.is_server_error() =>
|
||||
{
|
||||
"api_console_other_server_error"
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console { .. }) => "api_console_other_error",
|
||||
};
|
||||
NUM_WAKEUP_FAILURES.with_label_values(&[retry, kind]).inc();
|
||||
}
|
||||
|
||||
/// Try to connect to the compute node, retrying if necessary.
|
||||
/// This function might update `node_info`, so we take it by `&mut`.
|
||||
#[tracing::instrument(skip_all)]
|
||||
@@ -530,12 +440,10 @@ where
|
||||
match handle_try_wake(wake_res, num_retries) {
|
||||
Err(e) => {
|
||||
error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node");
|
||||
report_error(&e, false);
|
||||
return Err(e.into());
|
||||
}
|
||||
// failed to wake up but we can continue to retry
|
||||
Ok(ControlFlow::Continue(e)) => {
|
||||
report_error(&e, true);
|
||||
warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node");
|
||||
}
|
||||
// successfully woke up a compute node and can break the wakeup loop
|
||||
@@ -774,7 +682,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
|
||||
async fn connect_to_db(
|
||||
self,
|
||||
session: cancellation::Session<'_>,
|
||||
mode: ClientMode,
|
||||
allow_cleartext: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let Self {
|
||||
mut stream,
|
||||
@@ -790,7 +698,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
|
||||
};
|
||||
|
||||
let auth_result = match creds
|
||||
.authenticate(&extra, &mut stream, mode.allow_cleartext())
|
||||
.authenticate(&extra, &mut stream, allow_cleartext)
|
||||
.await
|
||||
{
|
||||
Ok(auth_result) => auth_result,
|
||||
@@ -816,14 +724,6 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
|
||||
.or_else(|e| stream.throw_error(e))
|
||||
.await?;
|
||||
|
||||
let proto = mode.protocol_label();
|
||||
NUM_DB_CONNECTIONS_OPENED_COUNTER
|
||||
.with_label_values(&[proto])
|
||||
.inc();
|
||||
scopeguard::defer! {
|
||||
NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc();
|
||||
}
|
||||
|
||||
prepare_client_connection(&node, reported_auth_ok, session, &mut stream).await?;
|
||||
// Before proxy passing, forward to compute whatever data is left in the
|
||||
// PqStream input buffer. Normally there is none, but our serverless npm
|
||||
|
||||
@@ -33,9 +33,13 @@ reqwest = { workspace = true, default-features = false, features = ["rustls-tls"
|
||||
aws-config = { workspace = true, default-features = false, features = ["rustls", "credentials-sso"] }
|
||||
|
||||
pageserver = { path = "../pageserver" }
|
||||
remote_storage = { path = "../libs/remote_storage" }
|
||||
|
||||
tracing.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
tracing-subscriber = { version = "0.3.17", features = ["ansi"] }
|
||||
clap.workspace = true
|
||||
tracing-appender = "0.2"
|
||||
histogram = "0.7"
|
||||
humantime.workspace = true
|
||||
camino.workspace = true
|
||||
aws-smithy-types = "0.56.1"
|
||||
|
||||
@@ -4,7 +4,6 @@ pub mod delete_batch_producer;
|
||||
pub mod metadata_stream;
|
||||
mod s3_deletion;
|
||||
pub mod scan_metadata;
|
||||
|
||||
use std::env;
|
||||
use std::fmt::Display;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -16,6 +16,8 @@ use tracing::{info, warn};
|
||||
|
||||
use clap::{Parser, Subcommand, ValueEnum};
|
||||
|
||||
mod restore_tenant_from_object_versioning;
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(author, version, about, long_about = None)]
|
||||
#[command(arg_required_else_help(true))]
|
||||
@@ -59,6 +61,9 @@ enum Command {
|
||||
skip_validation: bool,
|
||||
},
|
||||
ScanMetadata {},
|
||||
RestoreTenantFromObjectVersioningMostRecentIndexPart(
|
||||
restore_tenant_from_object_versioning::Command,
|
||||
),
|
||||
}
|
||||
|
||||
async fn tidy(
|
||||
@@ -247,5 +252,14 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
}
|
||||
},
|
||||
Command::RestoreTenantFromObjectVersioningMostRecentIndexPart(arg) => {
|
||||
match restore_tenant_from_object_versioning::doit(arg).await {
|
||||
Err(e) => {
|
||||
tracing::error!("Failed: {e:?}");
|
||||
Err(e)
|
||||
}
|
||||
Ok(()) => Ok(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
513
s3_scrubber/src/restore_tenant_from_object_versioning.rs
Normal file
513
s3_scrubber/src/restore_tenant_from_object_versioning.rs
Normal file
@@ -0,0 +1,513 @@
|
||||
//! Restore pageserver state from S3 object versioning.
|
||||
//!
|
||||
//! This sub-cmmand allows restoring a tenant's pageserver S3 state from S3 object versioning.
|
||||
//!
|
||||
//! # Instructions
|
||||
//!
|
||||
//! - Run command
|
||||
//! ```
|
||||
//! SSO_ACCOUNT_ID=... REGION=... \
|
||||
//! BUCKET=neon-{prod,staging}-storage-... \
|
||||
//! cargo run -p s3_scrubber \
|
||||
//! restore-tenant-from-object-versioning-most-recent-index-part \
|
||||
//! TENANT_TO_RESTORE \
|
||||
//! ./restore
|
||||
//! timeline-list TIMELINE_TO_RESTORE TIMELINE_TO_RESTORE ...
|
||||
//! ```
|
||||
//! - `./restore` now contains the timeline state referenced by the latest `index_part.json`s of the
|
||||
//! specified timelines in the `timeline-list`` argument
|
||||
//! - Use `cargo neon` to start a pageserver
|
||||
//! - `rm -rf .neon`
|
||||
//! - `cargo neon init`
|
||||
//! - `sed -i 's/\(.*control_plane_api.*\)/#\1/' .neon/config`
|
||||
//! - `sed -i 's/\(.*control_plane_api.*\)/#\1/' .neon/pageserver_1/pageserver.toml`
|
||||
//! - configure the pageserver remote storage config to point to the restore directory.
|
||||
//! Use your text editor to edit the TOML file: `.neon/pageserver_1/pageserver.toml`.
|
||||
//! ```
|
||||
//! [remote_storage]
|
||||
//! local_path = "/path/to/restore/pageserver/v1"
|
||||
//! ````
|
||||
//! - `cargo neon start`
|
||||
//! - make sure attaching the tenant works
|
||||
//! - `curl -X POST localhost:9898/v1/tenant/TENANT_TO_RESTORE/attach`
|
||||
//! - check `curl -X GET localhost:9898/v1/tenant/TENANT_TO_RESTORE | jq`
|
||||
//! - for each timeline $timeline_id to restore:
|
||||
//! - `cargo neon mappings map --branch-name restore-$timeline_id --tenant-id TENANT_TO_RESTORE --timeline-id $timeline_id`
|
||||
//! - `cargo neon endpoint create --tenant-id TENANT_TO_RESTORE --branch-name restore-$timeline_id ep-restore-$timeline_id`
|
||||
//! - `cargo neon endpoint start --tenant-id TENANT_TO_RESTORE ep-restore-$timeline_id`
|
||||
//! - it prints a connection string, looking like `postgresql://cloud_admin@127.0.0.1:PORT/DB`
|
||||
//! - dump database contents using postgres tools
|
||||
//! - determine PG version `$restore_pg_version` using
|
||||
//! ```
|
||||
//! curl -s -X GET localhost:9898/v1/tenant/TENANT_TO_RESTORE/timeline/$timeline_id | jq .pg_version
|
||||
//! ```
|
||||
//! - pg_dumpall
|
||||
//! ```
|
||||
//! ./pg_install/$restore_pg_version/bin/pg_dumpall -d THE_CONNECTION_STRING/postgres > ./restore/pg_dumpall.out
|
||||
//! ```
|
||||
//! - pg_dump a specific database
|
||||
//! ```
|
||||
//! ./pg_install/v15/bin/pg_dump -d 'THE_CONNECTION_STRING/THEDBTODUMP' > ./restore/pg_dump_THEDBTODUMP.out
|
||||
//! ```
|
||||
//! - `cargo neon endpoint stop --tenant-id TENANT_TO_RESTORE restore-$timeline_id`
|
||||
//!
|
||||
//! - Use the pg_dump files to restore the database into a new Neon project.
|
||||
//!
|
||||
//! # Limitations & Future Work
|
||||
//!
|
||||
//! Just restoring Pageserver S3 state restores a consistent state at an LSN that is NOT THE LAST COMMIT LSN.
|
||||
//! The reason is that Pageserver uploads layers to S3 with implementation-specific delays that are optimized for day-to-day operation.
|
||||
//!
|
||||
//! If we still had the Safekeeper WAL, we could restore the Safekeeper S3 state in a similar way.
|
||||
//! In that case, we wouldn't need the `pg_dump` step.
|
||||
//! We would simply attach the tenant to safekeepers and pageservers.
|
||||
//! When attaching to Safekeeper, we would need to tell it that PS remote_consistent_lsn is the restore-point-LSN,
|
||||
//! i.e., the LSNs in the restored index_part.json's in Pageserver S3 state.
|
||||
//! Pageserver attach would pick up the restored state from S3, and the Safekeeper & Pageserver would
|
||||
//! resume normal operation as if the clock had been wound back to restore-point-LSN.
|
||||
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
num::{NonZeroU32, NonZeroUsize},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
|
||||
use aws_sdk_s3::operation::list_object_versions::ListObjectVersionsOutput;
|
||||
use aws_types::region::Region;
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver::tenant::{
|
||||
remote_timeline_client::index::LayerFileMetadata, TENANTS_SEGMENT_NAME,
|
||||
TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
|
||||
};
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind};
|
||||
use s3_scrubber::{init_logging, init_s3_client, BucketConfig};
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tracing::{debug, info, info_span, Instrument};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
#[derive(Debug, Clone, clap::Subcommand)]
|
||||
enum ResurrectTimelines {
|
||||
TimelineList { timeline_ids: Vec<TimelineId> },
|
||||
// AllTimelinesDeletedAfter { timestamp: humantime::Timestamp },
|
||||
}
|
||||
|
||||
#[derive(clap::Args)]
|
||||
pub(crate) struct Command {
|
||||
tenant_id: TenantId,
|
||||
dest_dir: Utf8PathBuf,
|
||||
#[clap(short, long)]
|
||||
dry_run: bool,
|
||||
#[clap(subcommand)]
|
||||
timelines: ResurrectTimelines,
|
||||
}
|
||||
|
||||
pub(crate) async fn doit(args: Command) -> anyhow::Result<()> {
|
||||
let _logging_guard = {
|
||||
let log_prefix = format!("restore_tenant_from_object_versioning_{}", args.tenant_id);
|
||||
let dry_suffix = if args.dry_run { "__dry" } else { "" };
|
||||
let file_name = {
|
||||
format!(
|
||||
"{}_{}{}.log",
|
||||
log_prefix,
|
||||
chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S"),
|
||||
dry_suffix,
|
||||
)
|
||||
};
|
||||
init_logging(&file_name)
|
||||
};
|
||||
|
||||
let restore_dst = if tokio::fs::try_exists(&args.dest_dir).await? {
|
||||
anyhow::bail!("destination directory already exists: {}", args.dest_dir,);
|
||||
} else {
|
||||
GenericRemoteStorage::from_config(&RemoteStorageConfig {
|
||||
max_concurrent_syncs: NonZeroUsize::new(100).unwrap(),
|
||||
max_sync_errors: NonZeroU32::new(1).unwrap(), // ???? would want so specify 0
|
||||
storage: RemoteStorageKind::LocalFs(args.dest_dir.clone()),
|
||||
})
|
||||
.context("instantiate restore destination")?
|
||||
};
|
||||
|
||||
let bucket_config = BucketConfig::from_env()?;
|
||||
|
||||
let bucket_region = Region::new(bucket_config.region);
|
||||
let delimiter = "/".to_string();
|
||||
let s3_client = Arc::new(init_s3_client(bucket_config.sso_account_id, bucket_region));
|
||||
let tenant_root = [
|
||||
"pageserver",
|
||||
"v1",
|
||||
TENANTS_SEGMENT_NAME,
|
||||
&args.tenant_id.to_string(),
|
||||
]
|
||||
.join(&delimiter);
|
||||
|
||||
let tenant_delete_marker = [&tenant_root, TENANT_DELETED_MARKER_FILE_NAME].join(&delimiter);
|
||||
|
||||
// - Ensure the prefix is empty when ignoring existence of versions.
|
||||
// - Ensure the tenant delete marker key is part of `DeleteMarkers`. If it isn't, the tenant hasn't finished deletion yet and we should let pageservers complete it first.
|
||||
// - Restore each index_part.json based on the version in DeleteMarkers as well as the layers it references. For the layers, also use the version in DeleteMarkers and ensure it is the latest.
|
||||
// - Remove the deleted_at mark for the specified timelines.
|
||||
//
|
||||
// Notes:
|
||||
// - The restore will happen in-place because it's hard to change tenant/timeline ids.
|
||||
// - The restore could be interrupted mid-way.
|
||||
// - Hence, separate plan-making and plan-execution.
|
||||
|
||||
async {
|
||||
info!("send request");
|
||||
let res = s3_client
|
||||
.list_objects_v2()
|
||||
.bucket(&bucket_config.bucket)
|
||||
.prefix(&tenant_root)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
info!(response=?res, "got response");
|
||||
|
||||
if res.key_count() > 0 {
|
||||
anyhow::bail!("tenant prefix is not empty in S3");
|
||||
}
|
||||
if res.is_truncated() {
|
||||
unimplemented!("can this even happen")
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
.instrument(info_span!("ensure prefix empty"))
|
||||
.await
|
||||
.context("ensure prefix is empty")?;
|
||||
|
||||
async {
|
||||
info!("send request");
|
||||
let res = s3_client
|
||||
.list_object_versions()
|
||||
.bucket(&bucket_config.bucket)
|
||||
.prefix(&tenant_delete_marker)
|
||||
.send()
|
||||
.await?;
|
||||
debug!(response=?res, "got response");
|
||||
|
||||
if res.is_truncated() {
|
||||
unimplemented!("can this even happen")
|
||||
}
|
||||
|
||||
let markers = res
|
||||
.delete_markers()
|
||||
.context("expected delete marker in response")?;
|
||||
|
||||
if markers.len() != 1 {
|
||||
anyhow::bail!("expected exactly one delete marker because we create and delete the marker exactly once, got {}", markers.len());
|
||||
}
|
||||
|
||||
if !markers[0].is_latest() {
|
||||
anyhow::bail!("expected delete marker to have IsLatest set: {:?}", markers[0]);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
.instrument(info_span!(
|
||||
"ensure tenant delete marker exists in DeleteMarkers",
|
||||
tenant_delete_marker,
|
||||
))
|
||||
.await
|
||||
.context("ensure tenant delete marker exists in DeleteMarkers")?;
|
||||
|
||||
let timelines = match args.timelines {
|
||||
ResurrectTimelines::TimelineList { timeline_ids } => timeline_ids,
|
||||
};
|
||||
|
||||
// Fetch all the information we need to execute the restore.
|
||||
let version_responses_by_timeline = async {
|
||||
let mut out: HashMap<TimelineId, Vec<Arc<ListObjectVersionsOutput>>> = Default::default();
|
||||
for tl in &timelines {
|
||||
async {
|
||||
let timeline_prefix = [tenant_root.as_str(), TIMELINES_SEGMENT_NAME , &tl.to_string()].join(&delimiter);
|
||||
let mut next_key_marker = None;
|
||||
let mut next_version_id_marker = None;
|
||||
loop {
|
||||
info!("sending request");
|
||||
let res: ListObjectVersionsOutput = s3_client.list_object_versions()
|
||||
.bucket(&bucket_config.bucket)
|
||||
.prefix(&timeline_prefix)
|
||||
.set_key_marker(next_key_marker.take())
|
||||
.set_version_id_marker(next_version_id_marker.take())
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
let res = Arc::new(res);
|
||||
out.entry(*tl).or_default().push(Arc::clone(&res));
|
||||
|
||||
info!("got response");
|
||||
match res.versions() {
|
||||
Some(versions) => {
|
||||
for version in versions {
|
||||
info!("version: {:?}", version);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
info!("no versions");
|
||||
}
|
||||
}
|
||||
match res.delete_markers() {
|
||||
Some(markers) => {
|
||||
for marker in markers {
|
||||
info!("delete marker: {:?}", marker);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
info!("no delete markers");
|
||||
}
|
||||
}
|
||||
|
||||
if !res.is_truncated() {
|
||||
break;
|
||||
}
|
||||
next_key_marker = res.next_key_marker().map(|s| s.to_string());
|
||||
next_version_id_marker = res.next_version_id_marker().map(|s| s.to_string());
|
||||
if let (None, None) = (&next_key_marker, &next_version_id_marker) {
|
||||
anyhow::bail!("s3 returned is_truncated=true but neither next_key_marker nor next_version_id_marker are set");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}.instrument(info_span!("timeline", timeline_id=%tl)).await?;
|
||||
}
|
||||
anyhow::Ok(out)
|
||||
}.instrument(info_span!("list all object versions and delete markers")).await?;
|
||||
#[derive(Debug)]
|
||||
struct LatestVersion {
|
||||
key: String,
|
||||
last_modified: aws_smithy_types::DateTime,
|
||||
version_id: String,
|
||||
}
|
||||
let find_latest_version_based_on_delete_marker_last_modified = |tl: &TimelineId, key: &str| {
|
||||
let restore_version_delete_marker = {
|
||||
let mut candidates = Vec::new();
|
||||
for res in &version_responses_by_timeline[tl] {
|
||||
let Some(markers) = res.delete_markers() else {
|
||||
continue;
|
||||
};
|
||||
for marker in markers {
|
||||
if !marker.is_latest() {
|
||||
continue;
|
||||
}
|
||||
if marker.key().unwrap() != key {
|
||||
continue;
|
||||
}
|
||||
candidates.push(LatestVersion {
|
||||
key: marker.key().unwrap().to_owned(),
|
||||
last_modified: marker.last_modified().unwrap().clone(),
|
||||
version_id: marker.version_id().unwrap().to_owned(),
|
||||
});
|
||||
}
|
||||
}
|
||||
info!(?candidates, "marker candidates");
|
||||
if candidates.len() != 1 {
|
||||
anyhow::bail!("expected exactly one IsLatest, got {}", candidates.len());
|
||||
}
|
||||
candidates.pop().unwrap()
|
||||
};
|
||||
info!(?restore_version_delete_marker, "found marker");
|
||||
|
||||
// There's no way to get the latest version from the delete marker.
|
||||
// But, we observe (can't find written guarantee) that the Delete Marker's "Last Modified" is >= the latest version.
|
||||
// So, find latest version based on that.
|
||||
let restore_version = {
|
||||
let mut candidates = Vec::new();
|
||||
for res in &version_responses_by_timeline[tl] {
|
||||
let Some(versions) = res.versions() else {
|
||||
continue;
|
||||
};
|
||||
for version in versions {
|
||||
if version.key().unwrap() != restore_version_delete_marker.key {
|
||||
continue;
|
||||
}
|
||||
candidates.push(LatestVersion {
|
||||
key: version.key().unwrap().to_owned(),
|
||||
last_modified: version.last_modified().unwrap().clone(),
|
||||
version_id: version.version_id().unwrap().to_owned(),
|
||||
});
|
||||
}
|
||||
}
|
||||
candidates.sort_by_key(|v| v.last_modified.clone());
|
||||
info!(?candidates, "version candidates");
|
||||
if candidates.is_empty() {
|
||||
anyhow::bail!(
|
||||
"expected at least one version matching the delete marker's key, got none"
|
||||
);
|
||||
}
|
||||
{
|
||||
let mut uniq = HashSet::new();
|
||||
for v in &candidates {
|
||||
if !uniq.insert(v.last_modified.clone()) {
|
||||
anyhow::bail!("last_modified timestamps are not unique, don't know which version to pick");
|
||||
}
|
||||
}
|
||||
}
|
||||
candidates.pop().unwrap() // we sorted ascending, so, pop() is the latest
|
||||
};
|
||||
anyhow::Ok(restore_version)
|
||||
};
|
||||
|
||||
let latest_index_part_versions: HashMap<TimelineId, LatestVersion> = {
|
||||
let span = info_span!("find index part version");
|
||||
let _enter = span.enter();
|
||||
|
||||
// The latest index part for a deleted tenant is always a DeletedMarker
|
||||
let mut out = HashMap::new();
|
||||
for tl in &timelines {
|
||||
let span = info_span!("timeline", timeline_id=%tl);
|
||||
let _enter = span.enter();
|
||||
let restore_version = find_latest_version_based_on_delete_marker_last_modified(
|
||||
tl,
|
||||
// TODO: support generation numbers
|
||||
&[
|
||||
&tenant_root,
|
||||
TIMELINES_SEGMENT_NAME,
|
||||
&tl.to_string(),
|
||||
pageserver::tenant::IndexPart::FILE_NAME,
|
||||
]
|
||||
.join(&delimiter),
|
||||
)?;
|
||||
out.insert(*tl, restore_version);
|
||||
}
|
||||
out
|
||||
};
|
||||
|
||||
let index_part_contents: HashMap<TimelineId, pageserver::tenant::IndexPart> = async {
|
||||
let mut out = HashMap::new();
|
||||
|
||||
for tl in &timelines {
|
||||
async {
|
||||
let v = &latest_index_part_versions[tl];
|
||||
|
||||
let mut body_buf = Vec::new();
|
||||
info!("send request");
|
||||
let res = s3_client
|
||||
.get_object()
|
||||
.bucket(&bucket_config.bucket)
|
||||
.key(&v.key)
|
||||
.version_id(&v.version_id)
|
||||
.send()
|
||||
.await?;
|
||||
info!(?res, "got response header");
|
||||
|
||||
res.body
|
||||
.into_async_read()
|
||||
.read_to_end(&mut body_buf)
|
||||
.await?;
|
||||
|
||||
let body_buf = String::from_utf8(body_buf)?;
|
||||
info!(body_buf, "received response body");
|
||||
|
||||
let mut index_part: pageserver::tenant::IndexPart =
|
||||
serde_json::from_str(&body_buf)?;
|
||||
info!(?index_part, "parsed index part");
|
||||
|
||||
let deleted_at = index_part.deleted_at.take();
|
||||
info!(
|
||||
?deleted_at,
|
||||
"removing deleted_at field from index part, previous value logged here"
|
||||
);
|
||||
|
||||
let updated_buf = serde_json::to_vec(&index_part)?;
|
||||
let updated_buf_len = updated_buf.len();
|
||||
info!("uploading modified index part to restore_dst");
|
||||
restore_dst
|
||||
.upload(
|
||||
std::io::Cursor::new(updated_buf),
|
||||
updated_buf_len,
|
||||
&RemotePath::from_string(&v.key).unwrap(),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.context("upload modified index part to restore_dst")?;
|
||||
|
||||
out.insert(*tl, index_part);
|
||||
|
||||
anyhow::Ok(())
|
||||
}
|
||||
.instrument(info_span!("timeline", timeline_id=%tl))
|
||||
.await?;
|
||||
}
|
||||
|
||||
anyhow::Ok(out)
|
||||
}
|
||||
.instrument(info_span!("get index part contents"))
|
||||
.await
|
||||
.context("get index part contents")?;
|
||||
|
||||
async {
|
||||
for (tl, index_part) in &index_part_contents {
|
||||
async {
|
||||
for (layer_file_name, layer_md) in &index_part.layer_metadata {
|
||||
let layer_md: LayerFileMetadata = layer_md.into();
|
||||
async {
|
||||
// TODO: support generations
|
||||
let layer_file_key = [
|
||||
&tenant_root,
|
||||
TIMELINES_SEGMENT_NAME,
|
||||
&tl.to_string(),
|
||||
&layer_file_name.file_name(),
|
||||
]
|
||||
.join(&delimiter);
|
||||
|
||||
// The latest index parts naturally reference the latest layers.
|
||||
// So, a deleted tenant's latest layers are the ones in DeleteMarkers.
|
||||
//
|
||||
// If we want to support restoring from not-latest index part, this will require more work.
|
||||
// The idea is to
|
||||
// 1. every index_part.json that we upload contains a strongly monotonically increasing sequence number
|
||||
// 2. every image layer that we upload is S3-metadata-tagged with the sequence number of the IndexPart
|
||||
// in which it first appeared.
|
||||
// This allows to recover the correct layer object version, even if we have a bug that overwrites layers.
|
||||
let restore_version =
|
||||
find_latest_version_based_on_delete_marker_last_modified(
|
||||
tl,
|
||||
&layer_file_key,
|
||||
)?;
|
||||
|
||||
// TODO: teach RemoteStorage copy operation so we can use s3_client.copy_object()
|
||||
async {
|
||||
let res = s3_client
|
||||
.get_object()
|
||||
.bucket(&bucket_config.bucket)
|
||||
.key(&restore_version.key)
|
||||
.version_id(&restore_version.version_id)
|
||||
.send()
|
||||
.await
|
||||
.context("get object header")?;
|
||||
// TODO: instead of file_size(), do actual data integrity checking.
|
||||
restore_dst
|
||||
.upload(
|
||||
res.body.into_async_read(),
|
||||
layer_md.file_size().try_into().unwrap(),
|
||||
&RemotePath::from_string(&restore_version.key).unwrap(),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.context("download-body-and-upload")?;
|
||||
anyhow::Ok(())
|
||||
}
|
||||
.instrument(info_span!("copy", layer_file_name=%layer_file_name))
|
||||
.await?;
|
||||
|
||||
anyhow::Ok(())
|
||||
}
|
||||
.instrument(info_span!("layer", layer_file_name=%layer_file_name))
|
||||
.await?;
|
||||
}
|
||||
|
||||
anyhow::Ok(())
|
||||
}
|
||||
.instrument(info_span!("timeline", timeline_id=%tl))
|
||||
.await?;
|
||||
}
|
||||
|
||||
anyhow::Ok(())
|
||||
}
|
||||
.instrument(info_span!("download layer files into restore_dst"))
|
||||
.await
|
||||
.context("download layers into restore dst")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -374,12 +374,8 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
if conf.http_auth.is_some() {
|
||||
router = router.middleware(auth_middleware(|request| {
|
||||
#[allow(clippy::mutable_key_type)]
|
||||
static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> = Lazy::new(|| {
|
||||
["/v1/status", "/metrics"]
|
||||
.iter()
|
||||
.map(|v| v.parse().unwrap())
|
||||
.collect()
|
||||
});
|
||||
static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> =
|
||||
Lazy::new(|| ["/v1/status"].iter().map(|v| v.parse().unwrap()).collect());
|
||||
if ALLOWLIST_ROUTES.contains(request.uri()) {
|
||||
None
|
||||
} else {
|
||||
|
||||
@@ -65,7 +65,7 @@ def start_heavy_write_workload(env: PgCompare, n_tables: int, scale: int, num_it
|
||||
|
||||
def start_single_table_workload(table_id: int):
|
||||
for _ in range(num_iters):
|
||||
with env.pg.connect(options="-cstatement_timeout=300s").cursor() as cur:
|
||||
with env.pg.connect().cursor() as cur:
|
||||
cur.execute(
|
||||
f"INSERT INTO t{table_id} SELECT FROM generate_series(1,{new_rows_each_update})"
|
||||
)
|
||||
|
||||
@@ -188,7 +188,7 @@ def test_sql_over_http(static_proxy: NeonProxy):
|
||||
headers={"Content-Type": "application/sql", "Neon-Connection-String": connstr},
|
||||
verify=str(static_proxy.test_output_dir / "proxy.crt"),
|
||||
)
|
||||
assert response.status_code == 200, response.text
|
||||
assert response.status_code == 200
|
||||
return response.json()
|
||||
|
||||
rows = q("select 42 as answer")["rows"]
|
||||
@@ -206,12 +206,6 @@ def test_sql_over_http(static_proxy: NeonProxy):
|
||||
rows = q("select $1::json->'a' as answer", [{"a": {"b": 42}}])["rows"]
|
||||
assert rows == [{"answer": {"b": 42}}]
|
||||
|
||||
rows = q("select $1::jsonb[] as answer", [[{}]])["rows"]
|
||||
assert rows == [{"answer": [{}]}]
|
||||
|
||||
rows = q("select $1::jsonb[] as answer", [[{"foo": 1}, {"bar": 2}]])["rows"]
|
||||
assert rows == [{"answer": [{"foo": 1}, {"bar": 2}]}]
|
||||
|
||||
rows = q("select * from pg_class limit 1")["rows"]
|
||||
assert len(rows) == 1
|
||||
|
||||
|
||||
@@ -6,7 +6,6 @@ from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
import asyncpg
|
||||
import pytest
|
||||
import toml
|
||||
from fixtures.log_helper import getLogger
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder, Safekeeper
|
||||
@@ -598,10 +597,7 @@ async def run_wal_lagging(env: NeonEnv, endpoint: Endpoint, test_output_dir: Pat
|
||||
assert res == expected_sum
|
||||
|
||||
|
||||
# Do inserts while restarting postgres and messing with safekeeper addresses.
|
||||
# The test takes more than default 5 minutes on Postgres 16,
|
||||
# see https://github.com/neondatabase/neon/issues/5305
|
||||
@pytest.mark.timeout(600)
|
||||
# do inserts while restarting postgres and messing with safekeeper addresses
|
||||
def test_wal_lagging(neon_env_builder: NeonEnvBuilder, test_output_dir: Path):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
Reference in New Issue
Block a user