diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 197b83fac4..1c0971a49d 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -1175,7 +1175,7 @@ jobs: -f deployPgSniRouter=false \ -f deployProxy=false \ -f deployStorage=true \ - -f deployStorageBroker=true \ + -f deployStorageBroker=false \ -f deployStorageController=true \ -f branch=main \ -f dockerTag=${{needs.meta.outputs.build-tag}} \ @@ -1183,7 +1183,7 @@ jobs: gh workflow --repo neondatabase/infra run deploy-prod.yml --ref main \ -f deployStorage=true \ - -f deployStorageBroker=true \ + -f deployStorageBroker=false \ -f deployStorageController=true \ -f branch=main \ -f dockerTag=${{needs.meta.outputs.build-tag}} diff --git a/deny.toml b/deny.toml index ed7aa9ef9f..1023b1833a 100644 --- a/deny.toml +++ b/deny.toml @@ -31,6 +31,10 @@ reason = "the marvin attack only affects private key decryption, not public key id = "RUSTSEC-2024-0436" reason = "The paste crate is a build-only dependency with no runtime components. It is unlikely to have any security impact." +[[advisories.ignore]] +id = "RUSTSEC-2025-0014" +reason = "The humantime is widely used and is not easy to replace right now. It is unmaintained, but it has no known vulnerabilities to care about. #11179" + # This section is considered when running `cargo deny check licenses` # More documentation for the licenses section can be found here: # https://embarkstudios.github.io/cargo-deny/checks/licenses/cfg.html diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 13a9b5d89e..b1ebad83b1 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -1476,8 +1476,14 @@ pub struct TenantScanRemoteStorageResponse { #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "snake_case")] pub enum TenantSorting { + /// Total size of layers on local disk for all timelines in a shard. ResidentSize, + /// The logical size of the largest timeline within a _tenant_ (not shard). Only tracked on + /// shard 0, contains the sum across all shards. MaxLogicalSize, + /// The logical size of the largest timeline within a _tenant_ (not shard), divided by number of + /// shards. Only tracked on shard 0, and estimates the per-shard logical size. + MaxLogicalSizePerShard, } impl Default for TenantSorting { @@ -1507,14 +1513,20 @@ pub struct TopTenantShardsRequest { pub struct TopTenantShardItem { pub id: TenantShardId, - /// Total size of layers on local disk for all timelines in this tenant + /// Total size of layers on local disk for all timelines in this shard. pub resident_size: u64, - /// Total size of layers in remote storage for all timelines in this tenant + /// Total size of layers in remote storage for all timelines in this shard. pub physical_size: u64, - /// The largest logical size of a timeline within this tenant + /// The largest logical size of a timeline within this _tenant_ (not shard). This is only + /// tracked on shard 0, and contains the sum of the logical size across all shards. pub max_logical_size: u64, + + /// The largest logical size of a timeline within this _tenant_ (not shard) divided by number of + /// shards. This is only tracked on shard 0, and is only an estimate as we divide it evenly by + /// shard count, rounded up. + pub max_logical_size_per_shard: u64, } #[derive(Serialize, Deserialize, Debug, Default)] diff --git a/libs/proxy/tokio-postgres2/src/client.rs b/libs/proxy/tokio-postgres2/src/client.rs index c70cb598de..08a06163e1 100644 --- a/libs/proxy/tokio-postgres2/src/client.rs +++ b/libs/proxy/tokio-postgres2/src/client.rs @@ -284,6 +284,18 @@ impl Client { simple_query::batch_execute(self.inner(), query).await } + pub async fn discard_all(&self) -> Result { + // clear the prepared statements that are about to be nuked from the postgres session + { + let mut typeinfo = self.inner.cached_typeinfo.lock(); + typeinfo.typeinfo = None; + typeinfo.typeinfo_composite = None; + typeinfo.typeinfo_enum = None; + } + + self.batch_execute("discard all").await + } + /// Begins a new database transaction. /// /// The transaction will roll back by default - use the `commit` method to commit it. diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 77bfab47e0..e5848bfd25 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -3223,6 +3223,7 @@ async fn post_top_tenants( match order_by { TenantSorting::ResidentSize => sizes.resident_size, TenantSorting::MaxLogicalSize => sizes.max_logical_size, + TenantSorting::MaxLogicalSizePerShard => sizes.max_logical_size_per_shard, } } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 3a34c8e254..62e1cdac0c 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3842,6 +3842,7 @@ impl Tenant { resident_size: 0, physical_size: 0, max_logical_size: 0, + max_logical_size_per_shard: 0, }; for timeline in self.timelines.lock().unwrap().values() { @@ -3858,6 +3859,10 @@ impl Tenant { ); } + result.max_logical_size_per_shard = result + .max_logical_size + .div_ceil(self.tenant_shard_id.shard_count.count() as u64); + result } } diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 97978aefb9..af250bf718 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -175,6 +175,7 @@ impl BlobWriter { start_offset: u64, gate: &utils::sync::gate::Gate, ctx: &RequestContext, + flush_task_span: tracing::Span, ) -> anyhow::Result { Ok(Self { io_buf: Some(BytesMut::new()), @@ -184,6 +185,7 @@ impl BlobWriter { || IoBufferMut::with_capacity(Self::CAPACITY), gate.enter()?, ctx, + flush_task_span, ), offset: start_offset, }) @@ -331,6 +333,7 @@ pub(crate) mod tests { use camino::Utf8PathBuf; use camino_tempfile::Utf8TempDir; use rand::{Rng, SeedableRng}; + use tracing::info_span; use super::*; use crate::context::DownloadBehavior; @@ -354,7 +357,7 @@ pub(crate) mod tests { let mut offsets = Vec::new(); { let file = Arc::new(VirtualFile::create_v2(pathbuf.as_path(), ctx).await?); - let mut wtr = BlobWriter::new(file, 0, &gate, ctx).unwrap(); + let mut wtr = BlobWriter::new(file, 0, &gate, ctx, info_span!("test")).unwrap(); for blob in blobs.iter() { let (_, res) = if compression { let res = wtr diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index ee4eb15748..a847589279 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -9,7 +9,7 @@ use camino::Utf8PathBuf; use num_traits::Num; use pageserver_api::shard::TenantShardId; use tokio_epoll_uring::{BoundedBuf, Slice}; -use tracing::error; +use tracing::{error, info_span}; use utils::id::TimelineId; use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64}; @@ -77,6 +77,7 @@ impl EphemeralFile { || IoBufferMut::with_capacity(TAIL_SZ), gate.enter()?, ctx, + info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename), ), _gate_guard: gate.enter()?, }) diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 9b3c4ee243..6aba75fa56 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -18,7 +18,7 @@ use tokio::fs::{self, File, OpenOptions}; use tokio::io::{AsyncSeekExt, AsyncWriteExt}; use tokio_util::io::StreamReader; use tokio_util::sync::CancellationToken; -use tracing::warn; +use tracing::{info_span, warn}; use utils::crashsafe::path_with_suffix_extension; use utils::id::{TenantId, TimelineId}; use utils::{backoff, pausable_failpoint}; @@ -230,6 +230,7 @@ async fn download_object( || IoBufferMut::with_capacity(super::BUFFER_SIZE), gate.enter().map_err(|_| DownloadError::Cancelled)?, ctx, + info_span!(parent: None, "download_object_buffered_writer", %dst_path), ); // TODO: use vectored write (writev) once supported by tokio-epoll-uring. diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 1d6e57fda5..bb101784d3 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -433,7 +433,13 @@ impl DeltaLayerWriterInner { let file = Arc::new(VirtualFile::create_v2(&path, ctx).await?); // Start at PAGE_SZ, make room for the header block - let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, ctx)?; + let blob_writer = BlobWriter::new( + file, + PAGE_SZ as u64, + gate, + ctx, + info_span!(parent: None, "delta_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path), + )?; // Initialize the b-tree index builder let block_buf = BlockBuf::new(); diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index aa0f3fbff6..6674838169 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -796,7 +796,13 @@ impl ImageLayerWriterInner { }; // Start at `PAGE_SZ` to make room for the header block. - let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, ctx)?; + let blob_writer = BlobWriter::new( + file, + PAGE_SZ as u64, + gate, + ctx, + info_span!(parent: None, "image_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path), + )?; // Initialize the b-tree index builder let block_buf = BlockBuf::new(); diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 87e0d02773..610c706bd8 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -393,6 +393,9 @@ impl GcCompactionQueue { if job.dry_run { flags |= CompactFlags::DryRun; } + if options.flags.contains(CompactFlags::NoYield) { + flags |= CompactFlags::NoYield; + } let options = CompactOptions { flags, sub_compaction: false, @@ -1092,7 +1095,7 @@ impl Timeline { let latest_gc_cutoff = self.get_applied_gc_cutoff_lsn(); tracing::info!( - "latest_gc_cutoff: {}, pitr cutoff {}", + "starting shard ancestor compaction, latest_gc_cutoff: {}, pitr cutoff {}", *latest_gc_cutoff, self.gc_info.read().unwrap().cutoffs.time ); @@ -1121,6 +1124,7 @@ impl Timeline { // Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being // wrong. If ShardedRange claims the local page count is zero, then no keys in this layer // should be !is_key_disposable() + // TODO: exclude sparse keyspace from this check, otherwise it will infinitely loop. let range = layer_desc.get_key_range(); let mut key = range.start; while key < range.end { @@ -2619,6 +2623,7 @@ impl Timeline { ) -> Result { let sub_compaction = options.sub_compaction; let job = GcCompactJob::from_compact_options(options.clone()); + let no_yield = options.flags.contains(CompactFlags::NoYield); if sub_compaction { info!( "running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs" @@ -2633,14 +2638,15 @@ impl Timeline { idx + 1, jobs_len ); - self.compact_with_gc_inner(cancel, job, ctx).await?; + self.compact_with_gc_inner(cancel, job, ctx, no_yield) + .await?; } if jobs_len == 0 { info!("no jobs to run, skipping gc bottom-most compaction"); } return Ok(CompactionOutcome::Done); } - self.compact_with_gc_inner(cancel, job, ctx).await + self.compact_with_gc_inner(cancel, job, ctx, no_yield).await } async fn compact_with_gc_inner( @@ -2648,6 +2654,7 @@ impl Timeline { cancel: &CancellationToken, job: GcCompactJob, ctx: &RequestContext, + no_yield: bool, ) -> Result { // Block other compaction/GC tasks from running for now. GC-compaction could run along // with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc. @@ -2917,14 +2924,18 @@ impl Timeline { if cancel.is_cancelled() { return Err(CompactionError::ShuttingDown); } - let should_yield = self - .l0_compaction_trigger - .notified() - .now_or_never() - .is_some(); - if should_yield { - tracing::info!("preempt gc-compaction when downloading layers: too many L0 layers"); - return Ok(CompactionOutcome::YieldForL0); + if !no_yield { + let should_yield = self + .l0_compaction_trigger + .notified() + .now_or_never() + .is_some(); + if should_yield { + tracing::info!( + "preempt gc-compaction when downloading layers: too many L0 layers" + ); + return Ok(CompactionOutcome::YieldForL0); + } } let resident_layer = layer .download_and_keep_resident(ctx) @@ -3058,16 +3069,21 @@ impl Timeline { if cancel.is_cancelled() { return Err(CompactionError::ShuttingDown); } - keys_processed += 1; - if keys_processed % 1000 == 0 { - let should_yield = self - .l0_compaction_trigger - .notified() - .now_or_never() - .is_some(); - if should_yield { - tracing::info!("preempt gc-compaction in the main loop: too many L0 layers"); - return Ok(CompactionOutcome::YieldForL0); + + if !no_yield { + keys_processed += 1; + if keys_processed % 1000 == 0 { + let should_yield = self + .l0_compaction_trigger + .notified() + .now_or_never() + .is_some(); + if should_yield { + tracing::info!( + "preempt gc-compaction in the main loop: too many L0 layers" + ); + return Ok(CompactionOutcome::YieldForL0); + } } } if self.shard_identity.is_key_disposable(&key) { diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 2ed035b489..a738732a79 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -1289,10 +1289,8 @@ impl OwnedAsyncWriter for VirtualFile { buf: FullSlice, offset: u64, ctx: &RequestContext, - ) -> std::io::Result> { - let (buf, res) = VirtualFile::write_all_at(self, buf, offset, ctx).await; - res?; - Ok(buf) + ) -> (FullSlice, std::io::Result<()>) { + VirtualFile::write_all_at(self, buf, offset, ctx).await } } diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index 124d8fb75a..102e3d0036 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -33,7 +33,7 @@ pub trait OwnedAsyncWriter { buf: FullSlice, offset: u64, ctx: &RequestContext, - ) -> impl std::future::Future>> + Send; + ) -> impl std::future::Future, std::io::Result<()>)> + Send; } /// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch @@ -69,6 +69,7 @@ where buf_new: impl Fn() -> B, gate_guard: utils::sync::gate::GateGuard, ctx: &RequestContext, + flush_task_span: tracing::Span, ) -> Self { Self { writer: writer.clone(), @@ -78,6 +79,7 @@ where buf_new(), gate_guard, ctx.attached_child(), + flush_task_span, ), submit_offset: start_offset, } @@ -121,9 +123,8 @@ where let mut bytes_amount = submit_offset; if let Some(buf) = handle_tail(buf) { bytes_amount += buf.pending() as u64; - let _ = writer - .write_all_at(buf.flush(), submit_offset, &ctx) - .await?; + let (_, res) = writer.write_all_at(buf.flush(), submit_offset, &ctx).await; + let _: () = res?; } Ok((bytes_amount, writer)) } @@ -299,12 +300,12 @@ mod tests { buf: FullSlice, offset: u64, _: &RequestContext, - ) -> std::io::Result> { + ) -> (FullSlice, std::io::Result<()>) { self.writes .lock() .unwrap() .push((Vec::from(&buf[..]), offset)); - Ok(buf) + (buf, Ok(())) } } @@ -324,6 +325,7 @@ mod tests { || IoBufferMut::with_capacity(2), gate.enter()?, ctx, + tracing::Span::none(), ); writer.write_buffered_borrowed(b"abc", ctx).await?; diff --git a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs index 6804146d9a..ce9af0b06f 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs @@ -1,9 +1,14 @@ +use std::ops::ControlFlow; use std::{marker::PhantomData, sync::Arc}; +use once_cell::sync::Lazy; +use tokio_util::sync::CancellationToken; +use tracing::{Instrument, info, info_span, warn}; use utils::sync::duplex; use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter}; use crate::context::RequestContext; +use crate::virtual_file::MaybeFatalIo; use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAligned; use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice; @@ -120,6 +125,7 @@ where buf: B, gate_guard: utils::sync::gate::GateGuard, ctx: RequestContext, + span: tracing::Span, ) -> Self where B: Buffer + Send + 'static, @@ -127,11 +133,14 @@ where // It is fine to buffer up to only 1 message. We only 1 message in-flight at a time. let (front, back) = duplex::mpsc::channel(1); - let join_handle = tokio::spawn(async move { - FlushBackgroundTask::new(back, file, gate_guard, ctx) - .run(buf.flush()) - .await - }); + let join_handle = tokio::spawn( + async move { + FlushBackgroundTask::new(back, file, gate_guard, ctx) + .run(buf.flush()) + .await + } + .instrument(span), + ); FlushHandle { inner: Some(FlushHandleInner { @@ -240,6 +249,7 @@ where /// The passed in slice is immediately sent back to the flush handle through the duplex channel. async fn run(mut self, slice: FullSlice) -> std::io::Result { // Sends the extra buffer back to the handle. + // TODO: can this ever await and or fail? I think not. self.channel.send(slice).await.map_err(|_| { std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early") })?; @@ -255,10 +265,47 @@ where } // Write slice to disk at `offset`. - let slice = self - .writer - .write_all_at(request.slice, request.offset, &self.ctx) - .await?; + // + // Error handling happens according to the current policy of crashing + // on fatal IO errors and retrying in place otherwise (deeming all other errors retryable). + // (The upper layers of the Pageserver write path are not equipped to retry write errors + // becasuse they often deallocate the buffers that were already written). + // + // TODO: cancellation sensitiity. + // Without it, if we hit a bug where retrying is never successful, + // then we can't shut down the timeline/tenant/pageserver cleanly because + // layers of the Pageserver write path are holding the gate open for EphemeralFile. + // + // TODO: use utils::backoff::retry once async closures are actually usable + // + let mut slice_storage = Some(request.slice); + for attempt in 1.. { + let result = async { + if attempt > 1 { + info!("retrying flush"); + } + let slice = slice_storage.take().expect( + "likely previous invocation of this future didn't get polled to completion", + ); + let (slice, res) = self.writer.write_all_at(slice, request.offset, &self.ctx).await; + slice_storage = Some(slice); + let res = res.maybe_fatal_err("owned_buffers_io flush"); + let Err(err) = res else { + return ControlFlow::Break(()); + }; + warn!(%err, "error flushing buffered writer buffer to disk, retrying after backoff"); + static NO_CANCELLATION: Lazy = Lazy::new(CancellationToken::new); + utils::backoff::exponential_backoff(attempt, 1.0, 10.0, &NO_CANCELLATION).await; + ControlFlow::Continue(()) + } + .instrument(info_span!("flush_attempt", %attempt)) + .await; + match result { + ControlFlow::Break(()) => break, + ControlFlow::Continue(()) => continue, + } + } + let slice = slice_storage.expect("loop must have run at least once"); #[cfg(test)] { diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 7ec4ec99fc..0336d63e8d 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -57,10 +57,11 @@ static void SendProposerGreeting(Safekeeper *sk); static void RecvAcceptorGreeting(Safekeeper *sk); static void SendVoteRequest(Safekeeper *sk); static void RecvVoteResponse(Safekeeper *sk); +static bool VotesCollected(WalProposer *wp); static void HandleElectedProposer(WalProposer *wp); static term_t GetHighestTerm(TermHistory *th); -static term_t GetEpoch(Safekeeper *sk); -static void DetermineEpochStartLsn(WalProposer *wp); +static term_t GetLastLogTerm(Safekeeper *sk); +static void ProcessPropStartPos(WalProposer *wp); static void SendProposerElected(Safekeeper *sk); static void StartStreaming(Safekeeper *sk); static void SendMessageToNode(Safekeeper *sk); @@ -97,6 +98,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) wp = palloc0(sizeof(WalProposer)); wp->config = config; wp->api = api; + wp->state = WPS_COLLECTING_TERMS; wp_log(LOG, "neon.safekeepers=%s", wp->config->safekeepers_list); @@ -518,7 +520,7 @@ AdvancePollState(Safekeeper *sk, uint32 events) * nodes are transferred from SS_VOTING to sending actual vote * requests. */ - case SS_VOTING: + case SS_WAIT_VOTING: wp_log(WARNING, "EOF from node %s:%s in %s state", sk->host, sk->port, FormatSafekeeperState(sk)); ResetConnection(sk); @@ -547,7 +549,7 @@ AdvancePollState(Safekeeper *sk, uint32 events) /* * Idle state for waiting votes from quorum. */ - case SS_IDLE: + case SS_WAIT_ELECTED: wp_log(WARNING, "EOF from node %s:%s in %s state", sk->host, sk->port, FormatSafekeeperState(sk)); ResetConnection(sk); @@ -721,6 +723,15 @@ SendProposerGreeting(Safekeeper *sk) BlockingWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_HANDSHAKE_RECV); } +/* + * Have we received greeting from enough (quorum) safekeepers to start voting? + */ +static bool +TermsCollected(WalProposer *wp) +{ + return wp->n_connected >= wp->quorum; +} + static void RecvAcceptorGreeting(Safekeeper *sk) { @@ -754,7 +765,7 @@ RecvAcceptorGreeting(Safekeeper *sk) } /* Protocol is all good, move to voting. */ - sk->state = SS_VOTING; + sk->state = SS_WAIT_VOTING; /* * Note: it would be better to track the counter on per safekeeper basis, @@ -762,17 +773,18 @@ RecvAcceptorGreeting(Safekeeper *sk) * as is for now. */ ++wp->n_connected; - if (wp->n_connected <= wp->quorum) + if (wp->state == WPS_COLLECTING_TERMS) { /* We're still collecting terms from the majority. */ wp->propTerm = Max(sk->greetResponse.term, wp->propTerm); /* Quorum is acquried, prepare the vote request. */ - if (wp->n_connected == wp->quorum) + if (TermsCollected(wp)) { wp->propTerm++; wp_log(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, wp->quorum, wp->propTerm); + wp->state = WPS_CAMPAIGN; wp->voteRequest.pam.tag = 'v'; wp->voteRequest.generation = wp->mconf.generation; wp->voteRequest.term = wp->propTerm; @@ -787,12 +799,10 @@ RecvAcceptorGreeting(Safekeeper *sk) } /* - * Check if we have quorum. If there aren't enough safekeepers, wait and - * do nothing. We'll eventually get a task when the election starts. - * - * If we do have quorum, we can start an election. + * If we have quorum, start (or just send vote request to newly connected + * node) election, otherwise wait until we have more greetings. */ - if (wp->n_connected < wp->quorum) + if (wp->state == WPS_COLLECTING_TERMS) { /* * SS_VOTING is an idle state; read-ready indicates the connection @@ -807,11 +817,7 @@ RecvAcceptorGreeting(Safekeeper *sk) */ for (int j = 0; j < wp->n_safekeepers; j++) { - /* - * Remember: SS_VOTING indicates that the safekeeper is - * participating in voting, but hasn't sent anything yet. - */ - if (wp->safekeeper[j].state == SS_VOTING) + if (wp->safekeeper[j].state == SS_WAIT_VOTING) SendVoteRequest(&wp->safekeeper[j]); } } @@ -838,6 +844,8 @@ RecvVoteResponse(Safekeeper *sk) { WalProposer *wp = sk->wp; + Assert(wp->state >= WPS_CAMPAIGN); + sk->voteResponse.apm.tag = 'v'; if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->voteResponse)) return; @@ -856,7 +864,7 @@ RecvVoteResponse(Safekeeper *sk) * we are not elected yet and thus need the vote. */ if ((!sk->voteResponse.voteGiven) && - (sk->voteResponse.term > wp->propTerm || wp->n_votes < wp->quorum)) + (sk->voteResponse.term > wp->propTerm || wp->state == WPS_CAMPAIGN)) { wp_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "", sk->host, sk->port, @@ -864,38 +872,83 @@ RecvVoteResponse(Safekeeper *sk) } Assert(sk->voteResponse.term == wp->propTerm); - /* Handshake completed, do we have quorum? */ + /* ready for elected message */ + sk->state = SS_WAIT_ELECTED; + wp->n_votes++; - if (wp->n_votes < wp->quorum) + /* Are we already elected? */ + if (wp->state == WPS_CAMPAIGN) { - sk->state = SS_IDLE; /* can't do much yet, no quorum */ - } - else if (wp->n_votes > wp->quorum) - { - /* already elected, start streaming */ - SendProposerElected(sk); + /* no; check if this vote makes us elected */ + if (VotesCollected(wp)) + { + wp->state = WPS_ELECTED; + HandleElectedProposer(wp); + } + else + { + /* can't do much yet, no quorum */ + return; + } } else { - sk->state = SS_IDLE; - /* Idle state waits for read-ready events */ - wp->api.update_event_set(sk, WL_SOCKET_READABLE); - - HandleElectedProposer(sk->wp); + Assert(wp->state == WPS_ELECTED); + /* send elected only to this sk */ + SendProposerElected(sk); } } +/* + * Checks if enough votes has been collected to get elected and if that's the + * case finds the highest vote, setting donor, donorLastLogTerm, + * propTermStartLsn fields. Also sets truncateLsn. + */ +static bool +VotesCollected(WalProposer *wp) +{ + int n_ready = 0; + + /* assumed to be called only when not elected yet */ + Assert(wp->state == WPS_CAMPAIGN); + + wp->propTermStartLsn = InvalidXLogRecPtr; + wp->donorLastLogTerm = 0; + wp->truncateLsn = InvalidXLogRecPtr; + + for (int i = 0; i < wp->n_safekeepers; i++) + { + if (wp->safekeeper[i].state == SS_WAIT_ELECTED) + { + n_ready++; + + if (GetLastLogTerm(&wp->safekeeper[i]) > wp->donorLastLogTerm || + (GetLastLogTerm(&wp->safekeeper[i]) == wp->donorLastLogTerm && + wp->safekeeper[i].voteResponse.flushLsn > wp->propTermStartLsn)) + { + wp->donorLastLogTerm = GetLastLogTerm(&wp->safekeeper[i]); + wp->propTermStartLsn = wp->safekeeper[i].voteResponse.flushLsn; + wp->donor = i; + } + wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn); + } + } + + return n_ready >= wp->quorum; +} + /* * Called once a majority of acceptors have voted for us and current proposer * has been elected. * - * Sends ProposerElected message to all acceptors in SS_IDLE state and starts + * Sends ProposerElected message to all acceptors in SS_WAIT_ELECTED state and starts * replication from walsender. */ static void HandleElectedProposer(WalProposer *wp) { - DetermineEpochStartLsn(wp); + ProcessPropStartPos(wp); + Assert(wp->propTermStartLsn != InvalidXLogRecPtr); /* * Synchronously download WAL from the most advanced safekeeper. We do @@ -907,40 +960,24 @@ HandleElectedProposer(WalProposer *wp) wp_log(FATAL, "failed to download WAL for logical replicaiton"); } - /* - * Zero propEpochStartLsn means majority of safekeepers doesn't have any - * WAL, timeline was just created. Compute bumps it to basebackup LSN, - * otherwise we must be sync-safekeepers and we have nothing to do then. - * - * Proceeding is not only pointless but harmful, because we'd give - * safekeepers term history starting with 0/0. These hacks will go away - * once we disable implicit timeline creation on safekeepers and create it - * with non zero LSN from the start. - */ - if (wp->propEpochStartLsn == InvalidXLogRecPtr) - { - Assert(wp->config->syncSafekeepers); - wp_log(LOG, "elected with zero propEpochStartLsn in sync-safekeepers, exiting"); - wp->api.finish_sync_safekeepers(wp, wp->propEpochStartLsn); - } - - if (wp->truncateLsn == wp->propEpochStartLsn && wp->config->syncSafekeepers) + if (wp->truncateLsn == wp->propTermStartLsn && wp->config->syncSafekeepers) { /* Sync is not needed: just exit */ - wp->api.finish_sync_safekeepers(wp, wp->propEpochStartLsn); + wp->api.finish_sync_safekeepers(wp, wp->propTermStartLsn); /* unreachable */ } for (int i = 0; i < wp->n_safekeepers; i++) { - if (wp->safekeeper[i].state == SS_IDLE) + if (wp->safekeeper[i].state == SS_WAIT_ELECTED) SendProposerElected(&wp->safekeeper[i]); } /* * The proposer has been elected, and there will be no quorum waiting - * after this point. There will be no safekeeper with state SS_IDLE also, - * because that state is used only for quorum waiting. + * after this point. There will be no safekeeper with state + * SS_WAIT_ELECTED also, because that state is used only for quorum + * waiting. */ if (wp->config->syncSafekeepers) @@ -957,7 +994,7 @@ HandleElectedProposer(WalProposer *wp) return; } - wp->api.start_streaming(wp, wp->propEpochStartLsn); + wp->api.start_streaming(wp, wp->propTermStartLsn); /* Should not return here */ } @@ -970,7 +1007,7 @@ GetHighestTerm(TermHistory *th) /* safekeeper's epoch is the term of the highest entry in the log */ static term_t -GetEpoch(Safekeeper *sk) +GetLastLogTerm(Safekeeper *sk) { return GetHighestTerm(&sk->voteResponse.termHistory); } @@ -991,72 +1028,52 @@ SkipXLogPageHeader(WalProposer *wp, XLogRecPtr lsn) } /* - * Called after majority of acceptors gave votes, it calculates the most - * advanced safekeeper (who will be the donor) and epochStartLsn -- LSN since - * which we'll write WAL in our term. - * - * Sets truncateLsn along the way (though it is not of much use at this point -- - * only for skipping recovery). + * Called after quorum gave votes and proposer starting position (highest vote + * term + flush LSN) -- is determined (VotesCollected true), this function + * adopts it: pushes LSN to shmem, sets wp term history, verifies that the + * basebackup matches. */ static void -DetermineEpochStartLsn(WalProposer *wp) +ProcessPropStartPos(WalProposer *wp) { TermHistory *dth; - int n_ready = 0; WalproposerShmemState *walprop_shared; - wp->propEpochStartLsn = InvalidXLogRecPtr; - wp->donorEpoch = 0; - wp->truncateLsn = InvalidXLogRecPtr; - - for (int i = 0; i < wp->n_safekeepers; i++) - { - if (wp->safekeeper[i].state == SS_IDLE) - { - n_ready++; - - if (GetEpoch(&wp->safekeeper[i]) > wp->donorEpoch || - (GetEpoch(&wp->safekeeper[i]) == wp->donorEpoch && - wp->safekeeper[i].voteResponse.flushLsn > wp->propEpochStartLsn)) - { - wp->donorEpoch = GetEpoch(&wp->safekeeper[i]); - wp->propEpochStartLsn = wp->safekeeper[i].voteResponse.flushLsn; - wp->donor = i; - } - wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn); - } - } - - if (n_ready < wp->quorum) - { - /* - * This is a rare case that can be triggered if safekeeper has voted - * and disconnected. In this case, its state will not be SS_IDLE and - * its vote cannot be used, because we clean up `voteResponse` in - * `ShutdownConnection`. - */ - wp_log(FATAL, "missing majority of votes, collected %d, expected %d, got %d", wp->n_votes, wp->quorum, n_ready); - } + /* must have collected votes */ + Assert(wp->state == WPS_ELECTED); /* - * If propEpochStartLsn is 0, it means flushLsn is 0 everywhere, we are - * bootstrapping and nothing was committed yet. Start streaming then from - * the basebackup LSN. + * If propTermStartLsn is 0, it means flushLsn is 0 everywhere, we are + * bootstrapping and nothing was committed yet. Start streaming from the + * basebackup LSN then. + * + * In case of sync-safekeepers just exit: proceeding is not only pointless + * but harmful, because we'd give safekeepers term history starting with + * 0/0. These hacks will go away once we disable implicit timeline + * creation on safekeepers and create it with non zero LSN from the start. */ - if (wp->propEpochStartLsn == InvalidXLogRecPtr && !wp->config->syncSafekeepers) + if (wp->propTermStartLsn == InvalidXLogRecPtr) { - wp->propEpochStartLsn = wp->truncateLsn = wp->api.get_redo_start_lsn(wp); - wp_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propEpochStartLsn)); + if (!wp->config->syncSafekeepers) + { + wp->propTermStartLsn = wp->truncateLsn = wp->api.get_redo_start_lsn(wp); + wp_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propTermStartLsn)); + } + else + { + wp_log(LOG, "elected with zero propTermStartLsn in sync-safekeepers, exiting"); + wp->api.finish_sync_safekeepers(wp, wp->propTermStartLsn); + } } - pg_atomic_write_u64(&wp->api.get_shmem_state(wp)->propEpochStartLsn, wp->propEpochStartLsn); + pg_atomic_write_u64(&wp->api.get_shmem_state(wp)->propEpochStartLsn, wp->propTermStartLsn); Assert(wp->truncateLsn != InvalidXLogRecPtr || wp->config->syncSafekeepers); /* - * We will be generating WAL since propEpochStartLsn, so we should set + * We will be generating WAL since propTermStartLsn, so we should set * availableLsn to mark this LSN as the latest available position. */ - wp->availableLsn = wp->propEpochStartLsn; + wp->availableLsn = wp->propTermStartLsn; /* * Proposer's term history is the donor's + its own entry. @@ -1067,12 +1084,12 @@ DetermineEpochStartLsn(WalProposer *wp) if (dth->n_entries > 0) memcpy(wp->propTermHistory.entries, dth->entries, sizeof(TermSwitchEntry) * dth->n_entries); wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].term = wp->propTerm; - wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].lsn = wp->propEpochStartLsn; + wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].lsn = wp->propTermStartLsn; wp_log(LOG, "got votes from majority (%d) of nodes, term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X", wp->quorum, wp->propTerm, - LSN_FORMAT_ARGS(wp->propEpochStartLsn), + LSN_FORMAT_ARGS(wp->propTermStartLsn), wp->safekeeper[wp->donor].host, wp->safekeeper[wp->donor].port, LSN_FORMAT_ARGS(wp->truncateLsn)); @@ -1090,7 +1107,7 @@ DetermineEpochStartLsn(WalProposer *wp) * Safekeepers don't skip header as they need continious stream of * data, so correct LSN for comparison. */ - if (SkipXLogPageHeader(wp, wp->propEpochStartLsn) != wp->api.get_redo_start_lsn(wp)) + if (SkipXLogPageHeader(wp, wp->propTermStartLsn) != wp->api.get_redo_start_lsn(wp)) { /* * However, allow to proceed if last_log_term on the node which @@ -1111,8 +1128,8 @@ DetermineEpochStartLsn(WalProposer *wp) */ disable_core_dump(); wp_log(PANIC, - "collected propEpochStartLsn %X/%X, but basebackup LSN %X/%X", - LSN_FORMAT_ARGS(wp->propEpochStartLsn), + "collected propTermStartLsn %X/%X, but basebackup LSN %X/%X", + LSN_FORMAT_ARGS(wp->propTermStartLsn), LSN_FORMAT_ARGS(wp->api.get_redo_start_lsn(wp))); } } @@ -1623,7 +1640,7 @@ GetAcknowledgedByQuorumWALPosition(WalProposer *wp) * Like in Raft, we aren't allowed to commit entries from previous * terms, so ignore reported LSN until it gets to epochStartLsn. */ - responses[i] = wp->safekeeper[i].appendResponse.flushLsn >= wp->propEpochStartLsn ? wp->safekeeper[i].appendResponse.flushLsn : 0; + responses[i] = wp->safekeeper[i].appendResponse.flushLsn >= wp->propTermStartLsn ? wp->safekeeper[i].appendResponse.flushLsn : 0; } qsort(responses, wp->n_safekeepers, sizeof(XLogRecPtr), CompareLsn); @@ -1656,10 +1673,10 @@ UpdateDonorShmem(WalProposer *wp) * about its position immediately after election before any feedbacks are * sent. */ - if (wp->safekeeper[wp->donor].state >= SS_IDLE) + if (wp->safekeeper[wp->donor].state >= SS_WAIT_ELECTED) { donor = &wp->safekeeper[wp->donor]; - donor_lsn = wp->propEpochStartLsn; + donor_lsn = wp->propTermStartLsn; } /* @@ -1748,7 +1765,7 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *fromsk) for (int i = 0; i < wp->n_safekeepers; i++) { Safekeeper *sk = &wp->safekeeper[i]; - bool synced = sk->appendResponse.commitLsn >= wp->propEpochStartLsn; + bool synced = sk->appendResponse.commitLsn >= wp->propTermStartLsn; /* alive safekeeper which is not synced yet; wait for it */ if (sk->state != SS_OFFLINE && !synced) @@ -1772,7 +1789,7 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *fromsk) */ BroadcastAppendRequest(wp); - wp->api.finish_sync_safekeepers(wp, wp->propEpochStartLsn); + wp->api.finish_sync_safekeepers(wp, wp->propTermStartLsn); /* unreachable */ } } @@ -2378,7 +2395,7 @@ FormatSafekeeperState(Safekeeper *sk) case SS_HANDSHAKE_RECV: return_val = "handshake (receiving)"; break; - case SS_VOTING: + case SS_WAIT_VOTING: return_val = "voting"; break; case SS_WAIT_VERDICT: @@ -2387,7 +2404,7 @@ FormatSafekeeperState(Safekeeper *sk) case SS_SEND_ELECTED_FLUSH: return_val = "send-announcement-flush"; break; - case SS_IDLE: + case SS_WAIT_ELECTED: return_val = "idle"; break; case SS_ACTIVE: @@ -2476,8 +2493,8 @@ SafekeeperStateDesiredEvents(Safekeeper *sk, uint32 *sk_events, uint32 *nwr_even * Idle states use read-readiness as a sign that the connection * has been disconnected. */ - case SS_VOTING: - case SS_IDLE: + case SS_WAIT_VOTING: + case SS_WAIT_ELECTED: *sk_events = WL_SOCKET_READABLE; return; diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 8d1ae26cac..d116bce806 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -73,12 +73,12 @@ typedef enum * Moved externally by execution of SS_HANDSHAKE_RECV, when we received a * quorum of handshakes. */ - SS_VOTING, + SS_WAIT_VOTING, /* * Already sent voting information, waiting to receive confirmation from - * the node. After receiving, moves to SS_IDLE, if the quorum isn't - * reached yet. + * the node. After receiving, moves to SS_WAIT_ELECTED, if the quorum + * isn't reached yet. */ SS_WAIT_VERDICT, @@ -91,7 +91,7 @@ typedef enum * * Moves to SS_ACTIVE only by call to StartStreaming. */ - SS_IDLE, + SS_WAIT_ELECTED, /* * Active phase, when we acquired quorum and have WAL to send or feedback @@ -751,6 +751,15 @@ typedef struct WalProposerConfig #endif } WalProposerConfig; +typedef enum +{ + /* collecting greetings to determine term to campaign for */ + WPS_COLLECTING_TERMS, + /* campaing started, waiting for votes */ + WPS_CAMPAIGN, + /* successfully elected */ + WPS_ELECTED, +} WalProposerState; /* * WAL proposer state. @@ -758,6 +767,7 @@ typedef struct WalProposerConfig typedef struct WalProposer { WalProposerConfig *config; + WalProposerState state; /* Current walproposer membership configuration */ MembershipConfiguration mconf; @@ -813,10 +823,10 @@ typedef struct WalProposer TermHistory propTermHistory; /* epoch start lsn of the proposer */ - XLogRecPtr propEpochStartLsn; + XLogRecPtr propTermStartLsn; /* Most advanced acceptor epoch */ - term_t donorEpoch; + term_t donorLastLogTerm; /* Most advanced acceptor */ int donor; diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index b21184de57..9c34c90002 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -1496,7 +1496,7 @@ walprop_pg_wal_reader_allocate(Safekeeper *sk) snprintf(log_prefix, sizeof(log_prefix), WP_LOG_PREFIX "sk %s:%s nwr: ", sk->host, sk->port); Assert(!sk->xlogreader); - sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propEpochStartLsn, log_prefix); + sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propTermStartLsn, log_prefix); if (sk->xlogreader == NULL) wpg_log(FATAL, "failed to allocate xlog reader"); } diff --git a/proxy/src/compute.rs b/proxy/src/compute.rs index 3852bfe348..26254beecf 100644 --- a/proxy/src/compute.rs +++ b/proxy/src/compute.rs @@ -290,7 +290,7 @@ impl ConnCfg { "connected to compute node at {host} ({socket_addr}) sslmode={:?}, latency={}, query_id={}", self.0.get_ssl_mode(), ctx.get_proxy_latency(), - ctx.get_testodrome_id(), + ctx.get_testodrome_id().unwrap_or_default(), ); // NB: CancelToken is supposed to hold socket_addr, but we use connect_raw. diff --git a/proxy/src/context/mod.rs b/proxy/src/context/mod.rs index 4f72a86f30..7c1a6206c1 100644 --- a/proxy/src/context/mod.rs +++ b/proxy/src/context/mod.rs @@ -272,6 +272,13 @@ impl RequestContext { .set_user_agent(user_agent); } + pub(crate) fn set_testodrome_id(&self, query_id: String) { + self.0 + .try_lock() + .expect("should not deadlock") + .set_testodrome_id(query_id); + } + pub(crate) fn set_auth_method(&self, auth_method: AuthMethod) { let mut this = self.0.try_lock().expect("should not deadlock"); this.auth_method = Some(auth_method); @@ -371,13 +378,12 @@ impl RequestContext { .accumulated() } - pub(crate) fn get_testodrome_id(&self) -> String { + pub(crate) fn get_testodrome_id(&self) -> Option { self.0 .try_lock() .expect("should not deadlock") .testodrome_query_id .clone() - .unwrap_or_default() } pub(crate) fn success(&self) { diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index b55661cec8..e40aa024a8 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -571,6 +571,11 @@ impl ConnectMechanism for TokioMechanism { "compute_id", tracing::field::display(&node_info.aux.compute_id), ); + + if let Some(query_id) = ctx.get_testodrome_id() { + info!("latency={}, query_id={}", ctx.get_proxy_latency(), query_id); + } + Ok(poll_client( self.pool.clone(), ctx, @@ -628,6 +633,10 @@ impl ConnectMechanism for HyperMechanism { tracing::field::display(&node_info.aux.compute_id), ); + if let Some(query_id) = ctx.get_testodrome_id() { + info!("latency={}, query_id={}", ctx.get_proxy_latency(), query_id); + } + Ok(poll_http2_client( self.pool.clone(), ctx, diff --git a/proxy/src/serverless/local_conn_pool.rs b/proxy/src/serverless/local_conn_pool.rs index 8426a0810e..c958d077fc 100644 --- a/proxy/src/serverless/local_conn_pool.rs +++ b/proxy/src/serverless/local_conn_pool.rs @@ -35,6 +35,7 @@ use super::conn_pool_lib::{ Client, ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, DbUserConn, EndpointConnPool, }; +use super::sql_over_http::SqlOverHttpError; use crate::context::RequestContext; use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo}; use crate::metrics::Metrics; @@ -274,18 +275,23 @@ pub(crate) fn poll_client( } impl ClientInnerCommon { - pub(crate) async fn set_jwt_session(&mut self, payload: &[u8]) -> Result<(), HttpConnError> { + pub(crate) async fn set_jwt_session(&mut self, payload: &[u8]) -> Result<(), SqlOverHttpError> { if let ClientDataEnum::Local(local_data) = &mut self.data { local_data.jti += 1; let token = resign_jwt(&local_data.key, payload, local_data.jti)?; - // discard all cannot run in a transaction. must be executed alone. - self.inner.batch_execute("discard all").await?; + self.inner + .discard_all() + .await + .map_err(SqlOverHttpError::InternalPostgres)?; // initiates the auth session // this is safe from query injections as the jwt format free of any escape characters. let query = format!("select auth.jwt_session_init('{token}')"); - self.inner.batch_execute(&query).await?; + self.inner + .batch_execute(&query) + .await + .map_err(SqlOverHttpError::InternalPostgres)?; let pid = self.inner.get_process_id(); info!(pid, jti = local_data.jti, "user session state init"); diff --git a/proxy/src/serverless/mod.rs b/proxy/src/serverless/mod.rs index acd6a05718..a7f46cbe58 100644 --- a/proxy/src/serverless/mod.rs +++ b/proxy/src/serverless/mod.rs @@ -446,6 +446,15 @@ async fn request_handler( .map(Into::into), ); + let testodrome_id = request + .headers() + .get("X-Neon-Query-ID") + .map(|value| value.to_str().unwrap_or_default().to_string()); + + if let Some(query_id) = testodrome_id { + ctx.set_testodrome_id(query_id); + } + let span = ctx.span(); info!(parent: &span, "performing websocket upgrade"); diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 93dd531f70..612702231f 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -412,8 +412,12 @@ pub(crate) enum SqlOverHttpError { ResponseTooLarge(usize), #[error("invalid isolation level")] InvalidIsolationLevel, + /// for queries our customers choose to run #[error("{0}")] - Postgres(#[from] postgres_client::Error), + Postgres(#[source] postgres_client::Error), + /// for queries we choose to run + #[error("{0}")] + InternalPostgres(#[source] postgres_client::Error), #[error("{0}")] JsonConversion(#[from] JsonConversionError), #[error("{0}")] @@ -429,6 +433,13 @@ impl ReportableError for SqlOverHttpError { SqlOverHttpError::ResponseTooLarge(_) => ErrorKind::User, SqlOverHttpError::InvalidIsolationLevel => ErrorKind::User, SqlOverHttpError::Postgres(p) => p.get_error_kind(), + SqlOverHttpError::InternalPostgres(p) => { + if p.as_db_error().is_some() { + ErrorKind::Service + } else { + ErrorKind::Compute + } + } SqlOverHttpError::JsonConversion(_) => ErrorKind::Postgres, SqlOverHttpError::Cancelled(c) => c.get_error_kind(), } @@ -444,6 +455,7 @@ impl UserFacingError for SqlOverHttpError { SqlOverHttpError::ResponseTooLarge(_) => self.to_string(), SqlOverHttpError::InvalidIsolationLevel => self.to_string(), SqlOverHttpError::Postgres(p) => p.to_string(), + SqlOverHttpError::InternalPostgres(p) => p.to_string(), SqlOverHttpError::JsonConversion(_) => "could not parse postgres response".to_string(), SqlOverHttpError::Cancelled(_) => self.to_string(), } @@ -462,6 +474,7 @@ impl HttpCodeError for SqlOverHttpError { SqlOverHttpError::ResponseTooLarge(_) => StatusCode::INSUFFICIENT_STORAGE, SqlOverHttpError::InvalidIsolationLevel => StatusCode::BAD_REQUEST, SqlOverHttpError::Postgres(_) => StatusCode::BAD_REQUEST, + SqlOverHttpError::InternalPostgres(_) => StatusCode::INTERNAL_SERVER_ERROR, SqlOverHttpError::JsonConversion(_) => StatusCode::INTERNAL_SERVER_ERROR, SqlOverHttpError::Cancelled(_) => StatusCode::INTERNAL_SERVER_ERROR, } @@ -671,16 +684,14 @@ async fn handle_db_inner( let authenticate_and_connect = Box::pin( async { let keys = match auth { - AuthData::Password(pw) => { - backend - .authenticate_with_password(ctx, &conn_info.user_info, &pw) - .await? - } - AuthData::Jwt(jwt) => { - backend - .authenticate_with_jwt(ctx, &conn_info.user_info, jwt) - .await? - } + AuthData::Password(pw) => backend + .authenticate_with_password(ctx, &conn_info.user_info, &pw) + .await + .map_err(HttpConnError::AuthError)?, + AuthData::Jwt(jwt) => backend + .authenticate_with_jwt(ctx, &conn_info.user_info, jwt) + .await + .map_err(HttpConnError::AuthError)?, }; let client = match keys.keys { @@ -703,7 +714,7 @@ async fn handle_db_inner( // not strictly necessary to mark success here, // but it's just insurance for if we forget it somewhere else ctx.success(); - Ok::<_, HttpConnError>(client) + Ok::<_, SqlOverHttpError>(client) } .map_err(SqlOverHttpError::from), ); @@ -933,11 +944,15 @@ impl BatchQueryData { builder = builder.deferrable(true); } - let transaction = builder.start().await.inspect_err(|_| { - // if we cannot start a transaction, we should return immediately - // and not return to the pool. connection is clearly broken - discard.discard(); - })?; + let transaction = builder + .start() + .await + .inspect_err(|_| { + // if we cannot start a transaction, we should return immediately + // and not return to the pool. connection is clearly broken + discard.discard(); + }) + .map_err(SqlOverHttpError::Postgres)?; let json_output = match query_batch( config, @@ -950,11 +965,15 @@ impl BatchQueryData { { Ok(json_output) => { info!("commit"); - let status = transaction.commit().await.inspect_err(|_| { - // if we cannot commit - for now don't return connection to pool - // TODO: get a query status from the error - discard.discard(); - })?; + let status = transaction + .commit() + .await + .inspect_err(|_| { + // if we cannot commit - for now don't return connection to pool + // TODO: get a query status from the error + discard.discard(); + }) + .map_err(SqlOverHttpError::Postgres)?; discard.check_idle(status); json_output } @@ -969,11 +988,15 @@ impl BatchQueryData { } Err(err) => { info!("rollback"); - let status = transaction.rollback().await.inspect_err(|_| { - // if we cannot rollback - for now don't return connection to pool - // TODO: get a query status from the error - discard.discard(); - })?; + let status = transaction + .rollback() + .await + .inspect_err(|_| { + // if we cannot rollback - for now don't return connection to pool + // TODO: get a query status from the error + discard.discard(); + }) + .map_err(SqlOverHttpError::Postgres)?; discard.check_idle(status); return Err(err); } @@ -1032,7 +1055,12 @@ async fn query_to_json( let query_start = Instant::now(); let query_params = data.params; - let mut row_stream = std::pin::pin!(client.query_raw_txt(&data.query, query_params).await?); + let mut row_stream = std::pin::pin!( + client + .query_raw_txt(&data.query, query_params) + .await + .map_err(SqlOverHttpError::Postgres)? + ); let query_acknowledged = Instant::now(); // Manually drain the stream into a vector to leave row_stream hanging @@ -1040,7 +1068,7 @@ async fn query_to_json( // big. let mut rows: Vec = Vec::new(); while let Some(row) = row_stream.next().await { - let row = row?; + let row = row.map_err(SqlOverHttpError::Postgres)?; *current_size += row.body_len(); rows.push(row); // we don't have a streaming response support yet so this is to prevent OOM @@ -1091,7 +1119,14 @@ async fn query_to_json( "dataTypeModifier": c.type_modifier(), "format": "text", })); - columns.push(client.get_type(c.type_oid()).await?); + + match client.get_type(c.type_oid()).await { + Ok(t) => columns.push(t), + Err(err) => { + tracing::warn!(?err, "unable to query type information"); + return Err(SqlOverHttpError::InternalPostgres(err)); + } + } } let array_mode = data.array_mode.unwrap_or(parsed_headers.default_array_mode); diff --git a/safekeeper/tests/walproposer_sim/walproposer_api.rs b/safekeeper/tests/walproposer_sim/walproposer_api.rs index 6451589e80..82e7a32881 100644 --- a/safekeeper/tests/walproposer_sim/walproposer_api.rs +++ b/safekeeper/tests/walproposer_sim/walproposer_api.rs @@ -511,8 +511,7 @@ impl ApiImpl for SimulationApi { // collected quorum with lower term, then got rejected by next connected safekeeper executor::exit(1, msg.to_owned()); } - if msg.contains("collected propEpochStartLsn") && msg.contains(", but basebackup LSN ") - { + if msg.contains("collected propTermStartLsn") && msg.contains(", but basebackup LSN ") { // sync-safekeepers collected wrong quorum, walproposer collected another quorum executor::exit(1, msg.to_owned()); } @@ -529,7 +528,7 @@ impl ApiImpl for SimulationApi { } fn after_election(&self, wp: &mut walproposer::bindings::WalProposer) { - let prop_lsn = wp.propEpochStartLsn; + let prop_lsn = wp.propTermStartLsn; let prop_term = wp.propTerm; let mut prev_lsn: u64 = 0; @@ -612,7 +611,7 @@ impl ApiImpl for SimulationApi { sk: &mut walproposer::bindings::Safekeeper, ) -> bool { let mut startpos = wp.truncateLsn; - let endpos = wp.propEpochStartLsn; + let endpos = wp.propTermStartLsn; if startpos == endpos { debug!("recovery_download: nothing to download"); diff --git a/storage_scrubber/src/pageserver_physical_gc.rs b/storage_scrubber/src/pageserver_physical_gc.rs index c956b1abbc..f14341c7bc 100644 --- a/storage_scrubber/src/pageserver_physical_gc.rs +++ b/storage_scrubber/src/pageserver_physical_gc.rs @@ -152,10 +152,8 @@ impl TenantRefAccumulator { } } - if !ancestor_refs.is_empty() { - tracing::info!(%ttid, "Found {} ancestor refs", ancestor_refs.len()); - self.ancestor_ref_shards.update(ttid, ancestor_refs); - } + tracing::info!(%ttid, "Found {} ancestor refs", ancestor_refs.len()); + self.ancestor_ref_shards.update(ttid, ancestor_refs); } /// Consume Self and return a vector of ancestor tenant shards that should be GC'd, and map of referenced ancestor layers to preserve @@ -779,7 +777,7 @@ pub async fn pageserver_physical_gc( let mut summary = GcSummary::default(); { - let timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY)); + let timelines = timelines.try_buffered(CONCURRENCY); let timelines = timelines.try_flatten(); let timelines = timelines.map_ok(|(ttid, tenant_manifest_arc)| { @@ -793,8 +791,8 @@ pub async fn pageserver_physical_gc( tenant_manifest_arc, ) }); - let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY)); - + let timelines = timelines.try_buffered(CONCURRENCY); + let mut timelines = std::pin::pin!(timelines); // Drain futures for per-shard GC, populating accumulator as a side effect while let Some(i) = timelines.next().await { summary.merge(i?);