Compare commits

...

9 Commits

Author SHA1 Message Date
BodoBolero
d0d29468d8 one more segfault in handling error messages 2025-02-06 17:43:00 +01:00
BodoBolero
567a665dc4 tried patch locally 2025-02-06 13:52:02 +01:00
BodoBolero
b0007302d0 include patches in repo 2025-02-06 13:37:00 +01:00
BodoBolero
b35dd198c3 fix pgcopydb seg fault and -c idle_in_transaction_session_timeout=0 2025-02-06 11:27:29 +01:00
Arpad Müller
b66fbd6176 Warn on basebackups for archived timelines (#10688)
We don't want any external requests for an archived timeline. This
includes basebackup requests, i.e. when a compute is being started up.

Therefore, we'd like to forbid such basebackup requests: any attempt to
get a basebackup on an archived timeline (or any getpage request really)
is a cplane bug. Make this a warning for now so that, if there is
potentially a bug, we can detect cases in the wild before they cause
stuck operations, but the intention is to return an error eventually.

Related: #9548
2025-02-06 10:09:20 +00:00
Vlad Lazar
95588dab98 safekeeper: fix wal fan-out shard subscription data race (#10677)
## Problem

[This select
arm](https://github.com/neondatabase/neon/blob/main/safekeeper/src/send_interpreted_wal.rs#L414)
runs when we want to attach a new reader to the current cursor.
It checks the current position of the cursor and resets it if required.

The current position of the cursor is updated in the [other select
arm](https://github.com/neondatabase/neon/blob/main/safekeeper/src/send_interpreted_wal.rs#L336-L345).
That runs when we get some WAL to send.

Now, what happens if we want to attach two shards consecutively to the
cursor?
Let's say [this select
arm](https://github.com/neondatabase/neon/blob/main/safekeeper/src/send_interpreted_wal.rs#L397)
runs twice in a row.

Let's assume cursor is currently at LSN X. First shard wants to attach
at position V
and the other one at W. Assume X > W > V.

First shard resets the stream to position V. Second shard comes in, 
sees stale cursor position X and resets it to W. This means that the 
first shard doesn't get wal in the [V, W) range.

## Summary of changes

Ultimately, this boils down to the current position not being kept in
sync with the reset of the WAL stream. This patch fixes the race by
updating it when resetting the WAL stream and adds a unit test repro.

Closes https://github.com/neondatabase/cloud/issues/23750
2025-02-06 09:24:28 +00:00
Christian Schwarz
1686d9e733 perf(page_service): dont .instrument(span.clone()) the response flush (#10686)
On my AX102 Hetzner box, removing this line removes about 20us from the
`latency_mean` result in

`test_pageserver_characterize_latencies_with_1_client_and_throughput_with_many_clients_one_tenant`.

If the same 20us can be removed in the nightly benchmark run, this will
be a ~10% improvement because there, mean latencies are about ~220us.

This span was added during batching refactors, we didn't have it before,
and I don't think it's terribly useful.

refs
- https://github.com/neondatabase/cloud/issues/21759
2025-02-06 08:33:37 +00:00
Erik Grinaker
abcd00181c pageserver: set a concurrency limit for LocalFS (#10676)
## Problem

The local filesystem backend for remote storage doesn't set a
concurrency limit. While it can't/won't enforce a concurrency limit
itself, this also bounds the upload queue concurrency. Some tests create
thousands of uploads, which slows down the quadratic scheduling of the
upload queue, and there is no point spawning that many Tokio tasks.

Resolves #10409.

## Summary of changes

Set a concurrency limit of 100 for the LocalFS backend.

Before: `test_layer_map[release-pg17].test_query: 68.338 s`
After: `test_layer_map[release-pg17].test_query: 5.209 s`
2025-02-06 07:24:36 +00:00
Konstantin Knizhnik
01f0be03b5 Fix bugs in lfc_cache_containsv (#10682)
## Problem

Incorrect manipulations with iteration index in `lfc_cache_containsv`

## Summary of changes

```
-		int		this_chunk = Min(nblocks, BLOCKS_PER_CHUNK - chunk_offs);
+		int		this_chunk = Min(nblocks - i, BLOCKS_PER_CHUNK - chunk_offs);		int		this_chunk = ```
 -		if (i + 1 >= nblocks)
+		if (i >= nblocks)
```

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-02-06 07:00:00 +00:00
12 changed files with 230 additions and 69 deletions

View File

@@ -24,3 +24,4 @@
!storage_controller/
!vendor/postgres-*/
!workspace_hack/
!build_tools/patches

View File

@@ -12,6 +12,8 @@ RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc && \
echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc
COPY build_tools/patches/pgcopydbv017.patch /pgcopydbv017.patch
RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \
set -e && \
apt update && \
@@ -44,6 +46,7 @@ RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \
mkdir /tmp/pgcopydb && \
tar -xzf /tmp/pgcopydb.tar.gz -C /tmp/pgcopydb --strip-components=1 && \
cd /tmp/pgcopydb && \
patch -p1 < /pgcopydbv017.patch && \
make -s clean && \
make -s -j12 install && \
libpq_path=$(find /lib /usr/lib -name "libpq.so.5" | head -n 1) && \

View File

@@ -0,0 +1,57 @@
diff --git a/src/bin/pgcopydb/copydb.c b/src/bin/pgcopydb/copydb.c
index d730b03..69a9be9 100644
--- a/src/bin/pgcopydb/copydb.c
+++ b/src/bin/pgcopydb/copydb.c
@@ -44,6 +44,7 @@ GUC dstSettings[] = {
{ "synchronous_commit", "'off'" },
{ "statement_timeout", "0" },
{ "lock_timeout", "0" },
+ { "idle_in_transaction_session_timeout", "0" },
{ NULL, NULL },
};
diff --git a/src/bin/pgcopydb/pgsql.c b/src/bin/pgcopydb/pgsql.c
index 94f2f46..e051ba8 100644
--- a/src/bin/pgcopydb/pgsql.c
+++ b/src/bin/pgcopydb/pgsql.c
@@ -2319,6 +2319,11 @@ pgsql_execute_log_error(PGSQL *pgsql,
LinesBuffer lbuf = { 0 };
+ if (message != NULL){
+ // make sure message is writable by splitLines
+ message = strdup(message);
+ }
+
if (!splitLines(&lbuf, message))
{
/* errors have already been logged */
@@ -2332,6 +2337,7 @@ pgsql_execute_log_error(PGSQL *pgsql,
PQbackendPID(pgsql->connection),
lbuf.lines[lineNumber]);
}
+ free(message); // free copy of message we created above
if (pgsql->logSQL)
{
@@ -3174,11 +3180,18 @@ pgcopy_log_error(PGSQL *pgsql, PGresult *res, const char *context)
/* errors have already been logged */
return;
}
-
if (res != NULL)
{
char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
- strlcpy(pgsql->sqlstate, sqlstate, sizeof(pgsql->sqlstate));
+ if (sqlstate == NULL)
+ {
+ // PQresultErrorField returned NULL!
+ pgsql->sqlstate[0] = '\0'; // Set to an empty string to avoid segfault
+ }
+ else
+ {
+ strlcpy(pgsql->sqlstate, sqlstate, sizeof(pgsql->sqlstate));
+ }
}
char *endpoint =

View File

@@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
use crate::{
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT,
DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
};
/// External backup storage configuration, enough for creating a client for that storage.
@@ -45,11 +45,11 @@ impl RemoteStorageKind {
impl RemoteStorageConfig {
/// Helper to fetch the configured concurrency limit.
pub fn concurrency_limit(&self) -> Option<usize> {
pub fn concurrency_limit(&self) -> usize {
match &self.storage {
RemoteStorageKind::LocalFs { .. } => None,
RemoteStorageKind::AwsS3(c) => Some(c.concurrency_limit.into()),
RemoteStorageKind::AzureContainer(c) => Some(c.concurrency_limit.into()),
RemoteStorageKind::LocalFs { .. } => DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT,
RemoteStorageKind::AwsS3(c) => c.concurrency_limit.into(),
RemoteStorageKind::AzureContainer(c) => c.concurrency_limit.into(),
}
}
}

View File

@@ -65,6 +65,12 @@ pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
/// Here, a limit of max 20k concurrent connections was noted.
/// <https://learn.microsoft.com/en-us/answers/questions/1301863/is-there-any-limitation-to-concurrent-connections>
pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 100;
/// Set this limit analogously to the S3 limit.
///
/// The local filesystem backend doesn't enforce a concurrency limit itself, but this also bounds
/// the upload queue concurrency. Some tests create thousands of uploads, which slows down the
/// quadratic scheduling of the upload queue, and there is no point spawning so many Tokio tasks.
pub const DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT: usize = 100;
/// No limits on the client side, which currenltly means 1000 for AWS S3.
/// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>
pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;

View File

@@ -1280,8 +1280,6 @@ impl PageServerHandler {
}
Ok(())
}
// and log the info! line inside the request span
.instrument(span.clone())
.await?;
}
Ok(())
@@ -2037,6 +2035,12 @@ impl PageServerHandler {
.get(tenant_id, timeline_id, ShardSelector::Zero)
.await?;
if timeline.is_archived() == Some(true) {
// TODO after a grace period, turn this log line into a hard error
tracing::warn!("timeline {tenant_id}/{timeline_id} is archived, but got basebackup request for it.");
//return Err(QueryError::NotFound("timeline is archived".into()))
}
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
if let Some(lsn) = lsn {
// Backup was requested at a particular LSN. Wait for it to arrive.

View File

@@ -437,8 +437,7 @@ impl RemoteTimelineClient {
.conf
.remote_storage_config
.as_ref()
.and_then(|r| r.concurrency_limit())
.unwrap_or(0);
.map_or(0, |r| r.concurrency_limit());
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?;
self.update_remote_physical_size_gauge(Some(index_part));
@@ -461,8 +460,7 @@ impl RemoteTimelineClient {
.conf
.remote_storage_config
.as_ref()
.and_then(|r| r.concurrency_limit())
.unwrap_or(0);
.map_or(0, |r| r.concurrency_limit());
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_empty_remote(local_metadata, inprogress_limit)?;
self.update_remote_physical_size_gauge(None);
@@ -484,8 +482,7 @@ impl RemoteTimelineClient {
.conf
.remote_storage_config
.as_ref()
.and_then(|r| r.concurrency_limit())
.unwrap_or(0);
.map_or(0, |r| r.concurrency_limit());
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?;

View File

@@ -509,47 +509,44 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
tag.blockNum = (blkno + i) & ~(BLOCKS_PER_CHUNK - 1);
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
hash = get_hash_value(lfc_hash, &tag);
chunk_offs = (blkno + i) & (BLOCKS_PER_CHUNK - 1);
chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
LWLockAcquire(lfc_lock, LW_SHARED);
if (!LFC_ENABLED())
{
LWLockRelease(lfc_lock);
return 0;
}
while (true)
{
int this_chunk = Min(nblocks, BLOCKS_PER_CHUNK - chunk_offs);
if (LFC_ENABLED())
{
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
int this_chunk = Min(nblocks - i, BLOCKS_PER_CHUNK - chunk_offs);
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
if (entry != NULL)
if (entry != NULL)
{
for (; chunk_offs < BLOCKS_PER_CHUNK && i < nblocks; chunk_offs++, i++)
{
for (; chunk_offs < BLOCKS_PER_CHUNK && i < nblocks; chunk_offs++, i++)
if ((entry->bitmap[chunk_offs >> 5] &
((uint32)1 << (chunk_offs & 31))) != 0)
{
if ((entry->bitmap[chunk_offs >> 5] &
((uint32)1 << (chunk_offs & 31))) != 0)
{
BITMAP_SET(bitmap, i);
found++;
}
BITMAP_SET(bitmap, i);
found++;
}
}
else
{
i += this_chunk;
}
}
else
{
LWLockRelease(lfc_lock);
return found;
i += this_chunk;
}
/*
* Break out of the iteration before doing expensive stuff for
* a next iteration
*/
if (i + 1 >= nblocks)
if (i >= nblocks)
break;
/*

View File

@@ -120,6 +120,20 @@ pub enum InterpretedWalReaderError {
WalStreamClosed,
}
enum CurrentPositionUpdate {
Reset(Lsn),
NotReset(Lsn),
}
impl CurrentPositionUpdate {
fn current_position(&self) -> Lsn {
match self {
CurrentPositionUpdate::Reset(lsn) => *lsn,
CurrentPositionUpdate::NotReset(lsn) => *lsn,
}
}
}
impl InterpretedWalReaderState {
fn current_position(&self) -> Option<Lsn> {
match self {
@@ -129,6 +143,26 @@ impl InterpretedWalReaderState {
InterpretedWalReaderState::Done => None,
}
}
// Reset the current position of the WAL reader if the requested starting position
// of the new shard is smaller than the current value.
fn maybe_reset(&mut self, new_shard_start_pos: Lsn) -> CurrentPositionUpdate {
match self {
InterpretedWalReaderState::Running {
current_position, ..
} => {
if new_shard_start_pos < *current_position {
*current_position = new_shard_start_pos;
CurrentPositionUpdate::Reset(*current_position)
} else {
CurrentPositionUpdate::NotReset(*current_position)
}
}
InterpretedWalReaderState::Done => {
panic!("maybe_reset called on finished reader")
}
}
}
}
pub(crate) struct AttachShardNotification {
@@ -410,15 +444,24 @@ impl InterpretedWalReader {
};
senders.push(ShardSenderState { sender_id: new_sender_id, tx: sender, next_record_lsn: start_pos});
let current_pos = self.state.read().unwrap().current_position().unwrap();
if start_pos < current_pos {
self.wal_stream.reset(start_pos).await;
wal_decoder = WalStreamDecoder::new(start_pos, self.pg_version);
}
// If the shard is subscribing below the current position the we need
// to update the cursor that tracks where we are at in the WAL
// ([`Self::state`]) and reset the WAL stream itself
// (`[Self::wal_stream`]). This must be done atomically from the POV of
// anything outside the select statement.
let position_reset = self.state.write().unwrap().maybe_reset(start_pos);
match position_reset {
CurrentPositionUpdate::Reset(to) => {
self.wal_stream.reset(to).await;
wal_decoder = WalStreamDecoder::new(to, self.pg_version);
},
CurrentPositionUpdate::NotReset(_) => {}
};
tracing::info!(
"Added shard sender {} with start_pos={} current_pos={}",
ShardSenderId::new(shard_id, new_sender_id), start_pos, current_pos
ShardSenderId::new(shard_id, new_sender_id), start_pos, position_reset.current_position()
);
}
}
@@ -584,7 +627,7 @@ mod tests {
.unwrap();
let resident_tli = tli.wal_residence_guard().await.unwrap();
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT)
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, None)
.await
.unwrap();
let end_pos = end_watch.get();
@@ -715,7 +758,6 @@ mod tests {
const MSG_COUNT: usize = 200;
const PG_VERSION: u32 = 17;
const SHARD_COUNT: u8 = 2;
const ATTACHED_SHARDS: u8 = 4;
let start_lsn = Lsn::from_str("0/149FD18").unwrap();
let env = Env::new(true).unwrap();
@@ -725,9 +767,11 @@ mod tests {
.unwrap();
let resident_tli = tli.wal_residence_guard().await.unwrap();
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT)
.await
.unwrap();
let mut next_record_lsns = Vec::default();
let end_watch =
Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, Some(&mut next_record_lsns))
.await
.unwrap();
let end_pos = end_watch.get();
let streaming_wal_reader = StreamingWalReader::new(
@@ -746,38 +790,71 @@ mod tests {
)
.unwrap();
let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
let mut batch_receivers = vec![rx];
struct Sender {
tx: Option<tokio::sync::mpsc::Sender<Batch>>,
rx: tokio::sync::mpsc::Receiver<Batch>,
shard: ShardIdentity,
start_lsn: Lsn,
received_next_record_lsns: Vec<Lsn>,
}
impl Sender {
fn new(start_lsn: Lsn, shard: ShardIdentity) -> Self {
let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
Self {
tx: Some(tx),
rx,
shard,
start_lsn,
received_next_record_lsns: Vec::default(),
}
}
}
assert!(next_record_lsns.len() > 7);
let start_lsns = vec![
next_record_lsns[5],
next_record_lsns[1],
next_record_lsns[3],
];
let mut senders = start_lsns
.into_iter()
.map(|lsn| Sender::new(lsn, shard_0))
.collect::<Vec<_>>();
let first_sender = senders.first_mut().unwrap();
let handle = InterpretedWalReader::spawn(
streaming_wal_reader,
start_lsn,
tx,
shard_0,
first_sender.start_lsn,
first_sender.tx.take().unwrap(),
first_sender.shard,
PG_VERSION,
&Some("pageserver".to_string()),
);
for _ in 0..(ATTACHED_SHARDS - 1) {
let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
handle.fanout(shard_0, tx, start_lsn).unwrap();
batch_receivers.push(rx);
for sender in senders.iter_mut().skip(1) {
handle
.fanout(sender.shard, sender.tx.take().unwrap(), sender.start_lsn)
.unwrap();
}
loop {
let batch = batch_receivers.first_mut().unwrap().recv().await.unwrap();
for rx in batch_receivers.iter_mut().skip(1) {
let other_batch = rx.recv().await.unwrap();
assert_eq!(batch.wal_end_lsn, other_batch.wal_end_lsn);
assert_eq!(
batch.available_wal_end_lsn,
other_batch.available_wal_end_lsn
for sender in senders.iter_mut() {
loop {
let batch = sender.rx.recv().await.unwrap();
tracing::info!(
"Sender with start_lsn={} received batch ending at {} with {} records",
sender.start_lsn,
batch.wal_end_lsn,
batch.records.records.len()
);
}
if batch.wal_end_lsn == batch.available_wal_end_lsn {
break;
for rec in batch.records.records {
sender.received_next_record_lsns.push(rec.next_record_lsn);
}
if batch.wal_end_lsn == batch.available_wal_end_lsn {
break;
}
}
}
@@ -792,5 +869,20 @@ mod tests {
}
assert!(done);
for sender in senders {
tracing::info!(
"Validating records received by sender with start_lsn={}",
sender.start_lsn
);
assert!(sender.received_next_record_lsns.is_sorted());
let expected = next_record_lsns
.iter()
.filter(|lsn| **lsn > sender.start_lsn)
.copied()
.collect::<Vec<_>>();
assert_eq!(sender.received_next_record_lsns, expected);
}
}
}

View File

@@ -122,6 +122,7 @@ impl Env {
start_lsn: Lsn,
msg_size: usize,
msg_count: usize,
mut next_record_lsns: Option<&mut Vec<Lsn>>,
) -> anyhow::Result<EndWatch> {
let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(receive_wal::MSG_QUEUE_SIZE);
let (reply_tx, mut reply_rx) = tokio::sync::mpsc::channel(receive_wal::REPLY_QUEUE_SIZE);
@@ -130,7 +131,7 @@ impl Env {
WalAcceptor::spawn(tli.wal_residence_guard().await?, msg_rx, reply_tx, Some(0));
let prefix = c"p";
let prefix = c"neon-file:";
let prefixlen = prefix.to_bytes_with_nul().len();
assert!(msg_size >= prefixlen);
let message = vec![0; msg_size - prefixlen];
@@ -139,6 +140,9 @@ impl Env {
&mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), start_lsn);
for _ in 0..msg_count {
let (lsn, record) = walgen.next().unwrap();
if let Some(ref mut lsns) = next_record_lsns {
lsns.push(lsn);
}
let req = AppendRequest {
h: AppendRequestHeader {

View File

@@ -246,7 +246,7 @@ mod tests {
.unwrap();
let resident_tli = tli.wal_residence_guard().await.unwrap();
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT)
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, None)
.await
.unwrap();
let end_pos = end_watch.get();

View File

@@ -136,7 +136,7 @@ def run_command_and_log_output(command, log_file_path: Path):
"LD_LIBRARY_PATH": f"{os.getenv('PGCOPYDB_LIB_PATH')}:{os.getenv('PG_16_LIB_PATH')}",
"PGCOPYDB_SOURCE_PGURI": cast(str, os.getenv("BENCHMARK_INGEST_SOURCE_CONNSTR")),
"PGCOPYDB_TARGET_PGURI": cast(str, os.getenv("BENCHMARK_INGEST_TARGET_CONNSTR")),
"PGOPTIONS": "-c maintenance_work_mem=8388608 -c max_parallel_maintenance_workers=7",
"PGOPTIONS": "-c idle_in_transaction_session_timeout=0 -c maintenance_work_mem=8388608 -c max_parallel_maintenance_workers=7",
}
# Combine the current environment with custom variables
env = os.environ.copy()