mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-12 18:50:37 +00:00
Compare commits
9 Commits
release-pr
...
bodobolero
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d0d29468d8 | ||
|
|
567a665dc4 | ||
|
|
b0007302d0 | ||
|
|
b35dd198c3 | ||
|
|
b66fbd6176 | ||
|
|
95588dab98 | ||
|
|
1686d9e733 | ||
|
|
abcd00181c | ||
|
|
01f0be03b5 |
@@ -24,3 +24,4 @@
|
||||
!storage_controller/
|
||||
!vendor/postgres-*/
|
||||
!workspace_hack/
|
||||
!build_tools/patches
|
||||
|
||||
@@ -12,6 +12,8 @@ RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
|
||||
echo -e "retry_connrefused = on\ntimeout=15\ntries=5\n" > /root/.wgetrc && \
|
||||
echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /root/.curlrc
|
||||
|
||||
COPY build_tools/patches/pgcopydbv017.patch /pgcopydbv017.patch
|
||||
|
||||
RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \
|
||||
set -e && \
|
||||
apt update && \
|
||||
@@ -44,6 +46,7 @@ RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \
|
||||
mkdir /tmp/pgcopydb && \
|
||||
tar -xzf /tmp/pgcopydb.tar.gz -C /tmp/pgcopydb --strip-components=1 && \
|
||||
cd /tmp/pgcopydb && \
|
||||
patch -p1 < /pgcopydbv017.patch && \
|
||||
make -s clean && \
|
||||
make -s -j12 install && \
|
||||
libpq_path=$(find /lib /usr/lib -name "libpq.so.5" | head -n 1) && \
|
||||
|
||||
57
build_tools/patches/pgcopydbv017.patch
Normal file
57
build_tools/patches/pgcopydbv017.patch
Normal file
@@ -0,0 +1,57 @@
|
||||
diff --git a/src/bin/pgcopydb/copydb.c b/src/bin/pgcopydb/copydb.c
|
||||
index d730b03..69a9be9 100644
|
||||
--- a/src/bin/pgcopydb/copydb.c
|
||||
+++ b/src/bin/pgcopydb/copydb.c
|
||||
@@ -44,6 +44,7 @@ GUC dstSettings[] = {
|
||||
{ "synchronous_commit", "'off'" },
|
||||
{ "statement_timeout", "0" },
|
||||
{ "lock_timeout", "0" },
|
||||
+ { "idle_in_transaction_session_timeout", "0" },
|
||||
{ NULL, NULL },
|
||||
};
|
||||
|
||||
diff --git a/src/bin/pgcopydb/pgsql.c b/src/bin/pgcopydb/pgsql.c
|
||||
index 94f2f46..e051ba8 100644
|
||||
--- a/src/bin/pgcopydb/pgsql.c
|
||||
+++ b/src/bin/pgcopydb/pgsql.c
|
||||
@@ -2319,6 +2319,11 @@ pgsql_execute_log_error(PGSQL *pgsql,
|
||||
|
||||
LinesBuffer lbuf = { 0 };
|
||||
|
||||
+ if (message != NULL){
|
||||
+ // make sure message is writable by splitLines
|
||||
+ message = strdup(message);
|
||||
+ }
|
||||
+
|
||||
if (!splitLines(&lbuf, message))
|
||||
{
|
||||
/* errors have already been logged */
|
||||
@@ -2332,6 +2337,7 @@ pgsql_execute_log_error(PGSQL *pgsql,
|
||||
PQbackendPID(pgsql->connection),
|
||||
lbuf.lines[lineNumber]);
|
||||
}
|
||||
+ free(message); // free copy of message we created above
|
||||
|
||||
if (pgsql->logSQL)
|
||||
{
|
||||
@@ -3174,11 +3180,18 @@ pgcopy_log_error(PGSQL *pgsql, PGresult *res, const char *context)
|
||||
/* errors have already been logged */
|
||||
return;
|
||||
}
|
||||
-
|
||||
if (res != NULL)
|
||||
{
|
||||
char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
|
||||
- strlcpy(pgsql->sqlstate, sqlstate, sizeof(pgsql->sqlstate));
|
||||
+ if (sqlstate == NULL)
|
||||
+ {
|
||||
+ // PQresultErrorField returned NULL!
|
||||
+ pgsql->sqlstate[0] = '\0'; // Set to an empty string to avoid segfault
|
||||
+ }
|
||||
+ else
|
||||
+ {
|
||||
+ strlcpy(pgsql->sqlstate, sqlstate, sizeof(pgsql->sqlstate));
|
||||
+ }
|
||||
}
|
||||
|
||||
char *endpoint =
|
||||
@@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{
|
||||
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT,
|
||||
DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
|
||||
DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
|
||||
};
|
||||
|
||||
/// External backup storage configuration, enough for creating a client for that storage.
|
||||
@@ -45,11 +45,11 @@ impl RemoteStorageKind {
|
||||
|
||||
impl RemoteStorageConfig {
|
||||
/// Helper to fetch the configured concurrency limit.
|
||||
pub fn concurrency_limit(&self) -> Option<usize> {
|
||||
pub fn concurrency_limit(&self) -> usize {
|
||||
match &self.storage {
|
||||
RemoteStorageKind::LocalFs { .. } => None,
|
||||
RemoteStorageKind::AwsS3(c) => Some(c.concurrency_limit.into()),
|
||||
RemoteStorageKind::AzureContainer(c) => Some(c.concurrency_limit.into()),
|
||||
RemoteStorageKind::LocalFs { .. } => DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT,
|
||||
RemoteStorageKind::AwsS3(c) => c.concurrency_limit.into(),
|
||||
RemoteStorageKind::AzureContainer(c) => c.concurrency_limit.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,6 +65,12 @@ pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
|
||||
/// Here, a limit of max 20k concurrent connections was noted.
|
||||
/// <https://learn.microsoft.com/en-us/answers/questions/1301863/is-there-any-limitation-to-concurrent-connections>
|
||||
pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 100;
|
||||
/// Set this limit analogously to the S3 limit.
|
||||
///
|
||||
/// The local filesystem backend doesn't enforce a concurrency limit itself, but this also bounds
|
||||
/// the upload queue concurrency. Some tests create thousands of uploads, which slows down the
|
||||
/// quadratic scheduling of the upload queue, and there is no point spawning so many Tokio tasks.
|
||||
pub const DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT: usize = 100;
|
||||
/// No limits on the client side, which currenltly means 1000 for AWS S3.
|
||||
/// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html#API_ListObjectsV2_RequestSyntax>
|
||||
pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;
|
||||
|
||||
@@ -1280,8 +1280,6 @@ impl PageServerHandler {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
// and log the info! line inside the request span
|
||||
.instrument(span.clone())
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
@@ -2037,6 +2035,12 @@ impl PageServerHandler {
|
||||
.get(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.await?;
|
||||
|
||||
if timeline.is_archived() == Some(true) {
|
||||
// TODO after a grace period, turn this log line into a hard error
|
||||
tracing::warn!("timeline {tenant_id}/{timeline_id} is archived, but got basebackup request for it.");
|
||||
//return Err(QueryError::NotFound("timeline is archived".into()))
|
||||
}
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
if let Some(lsn) = lsn {
|
||||
// Backup was requested at a particular LSN. Wait for it to arrive.
|
||||
|
||||
@@ -437,8 +437,7 @@ impl RemoteTimelineClient {
|
||||
.conf
|
||||
.remote_storage_config
|
||||
.as_ref()
|
||||
.and_then(|r| r.concurrency_limit())
|
||||
.unwrap_or(0);
|
||||
.map_or(0, |r| r.concurrency_limit());
|
||||
let mut upload_queue = self.upload_queue.lock().unwrap();
|
||||
upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?;
|
||||
self.update_remote_physical_size_gauge(Some(index_part));
|
||||
@@ -461,8 +460,7 @@ impl RemoteTimelineClient {
|
||||
.conf
|
||||
.remote_storage_config
|
||||
.as_ref()
|
||||
.and_then(|r| r.concurrency_limit())
|
||||
.unwrap_or(0);
|
||||
.map_or(0, |r| r.concurrency_limit());
|
||||
let mut upload_queue = self.upload_queue.lock().unwrap();
|
||||
upload_queue.initialize_empty_remote(local_metadata, inprogress_limit)?;
|
||||
self.update_remote_physical_size_gauge(None);
|
||||
@@ -484,8 +482,7 @@ impl RemoteTimelineClient {
|
||||
.conf
|
||||
.remote_storage_config
|
||||
.as_ref()
|
||||
.and_then(|r| r.concurrency_limit())
|
||||
.unwrap_or(0);
|
||||
.map_or(0, |r| r.concurrency_limit());
|
||||
|
||||
let mut upload_queue = self.upload_queue.lock().unwrap();
|
||||
upload_queue.initialize_with_current_remote_index_part(index_part, inprogress_limit)?;
|
||||
|
||||
@@ -509,47 +509,44 @@ lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
|
||||
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
|
||||
|
||||
tag.blockNum = (blkno + i) & ~(BLOCKS_PER_CHUNK - 1);
|
||||
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1);
|
||||
hash = get_hash_value(lfc_hash, &tag);
|
||||
chunk_offs = (blkno + i) & (BLOCKS_PER_CHUNK - 1);
|
||||
chunk_offs = blkno & (BLOCKS_PER_CHUNK - 1);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_SHARED);
|
||||
|
||||
if (!LFC_ENABLED())
|
||||
{
|
||||
LWLockRelease(lfc_lock);
|
||||
return 0;
|
||||
}
|
||||
while (true)
|
||||
{
|
||||
int this_chunk = Min(nblocks, BLOCKS_PER_CHUNK - chunk_offs);
|
||||
if (LFC_ENABLED())
|
||||
{
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
||||
int this_chunk = Min(nblocks - i, BLOCKS_PER_CHUNK - chunk_offs);
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
||||
|
||||
if (entry != NULL)
|
||||
if (entry != NULL)
|
||||
{
|
||||
for (; chunk_offs < BLOCKS_PER_CHUNK && i < nblocks; chunk_offs++, i++)
|
||||
{
|
||||
for (; chunk_offs < BLOCKS_PER_CHUNK && i < nblocks; chunk_offs++, i++)
|
||||
if ((entry->bitmap[chunk_offs >> 5] &
|
||||
((uint32)1 << (chunk_offs & 31))) != 0)
|
||||
{
|
||||
if ((entry->bitmap[chunk_offs >> 5] &
|
||||
((uint32)1 << (chunk_offs & 31))) != 0)
|
||||
{
|
||||
BITMAP_SET(bitmap, i);
|
||||
found++;
|
||||
}
|
||||
BITMAP_SET(bitmap, i);
|
||||
found++;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
i += this_chunk;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
LWLockRelease(lfc_lock);
|
||||
return found;
|
||||
i += this_chunk;
|
||||
}
|
||||
|
||||
/*
|
||||
* Break out of the iteration before doing expensive stuff for
|
||||
* a next iteration
|
||||
*/
|
||||
if (i + 1 >= nblocks)
|
||||
if (i >= nblocks)
|
||||
break;
|
||||
|
||||
/*
|
||||
|
||||
@@ -120,6 +120,20 @@ pub enum InterpretedWalReaderError {
|
||||
WalStreamClosed,
|
||||
}
|
||||
|
||||
enum CurrentPositionUpdate {
|
||||
Reset(Lsn),
|
||||
NotReset(Lsn),
|
||||
}
|
||||
|
||||
impl CurrentPositionUpdate {
|
||||
fn current_position(&self) -> Lsn {
|
||||
match self {
|
||||
CurrentPositionUpdate::Reset(lsn) => *lsn,
|
||||
CurrentPositionUpdate::NotReset(lsn) => *lsn,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl InterpretedWalReaderState {
|
||||
fn current_position(&self) -> Option<Lsn> {
|
||||
match self {
|
||||
@@ -129,6 +143,26 @@ impl InterpretedWalReaderState {
|
||||
InterpretedWalReaderState::Done => None,
|
||||
}
|
||||
}
|
||||
|
||||
// Reset the current position of the WAL reader if the requested starting position
|
||||
// of the new shard is smaller than the current value.
|
||||
fn maybe_reset(&mut self, new_shard_start_pos: Lsn) -> CurrentPositionUpdate {
|
||||
match self {
|
||||
InterpretedWalReaderState::Running {
|
||||
current_position, ..
|
||||
} => {
|
||||
if new_shard_start_pos < *current_position {
|
||||
*current_position = new_shard_start_pos;
|
||||
CurrentPositionUpdate::Reset(*current_position)
|
||||
} else {
|
||||
CurrentPositionUpdate::NotReset(*current_position)
|
||||
}
|
||||
}
|
||||
InterpretedWalReaderState::Done => {
|
||||
panic!("maybe_reset called on finished reader")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct AttachShardNotification {
|
||||
@@ -410,15 +444,24 @@ impl InterpretedWalReader {
|
||||
};
|
||||
|
||||
senders.push(ShardSenderState { sender_id: new_sender_id, tx: sender, next_record_lsn: start_pos});
|
||||
let current_pos = self.state.read().unwrap().current_position().unwrap();
|
||||
if start_pos < current_pos {
|
||||
self.wal_stream.reset(start_pos).await;
|
||||
wal_decoder = WalStreamDecoder::new(start_pos, self.pg_version);
|
||||
}
|
||||
|
||||
// If the shard is subscribing below the current position the we need
|
||||
// to update the cursor that tracks where we are at in the WAL
|
||||
// ([`Self::state`]) and reset the WAL stream itself
|
||||
// (`[Self::wal_stream`]). This must be done atomically from the POV of
|
||||
// anything outside the select statement.
|
||||
let position_reset = self.state.write().unwrap().maybe_reset(start_pos);
|
||||
match position_reset {
|
||||
CurrentPositionUpdate::Reset(to) => {
|
||||
self.wal_stream.reset(to).await;
|
||||
wal_decoder = WalStreamDecoder::new(to, self.pg_version);
|
||||
},
|
||||
CurrentPositionUpdate::NotReset(_) => {}
|
||||
};
|
||||
|
||||
tracing::info!(
|
||||
"Added shard sender {} with start_pos={} current_pos={}",
|
||||
ShardSenderId::new(shard_id, new_sender_id), start_pos, current_pos
|
||||
ShardSenderId::new(shard_id, new_sender_id), start_pos, position_reset.current_position()
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -584,7 +627,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let resident_tli = tli.wal_residence_guard().await.unwrap();
|
||||
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT)
|
||||
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, None)
|
||||
.await
|
||||
.unwrap();
|
||||
let end_pos = end_watch.get();
|
||||
@@ -715,7 +758,6 @@ mod tests {
|
||||
const MSG_COUNT: usize = 200;
|
||||
const PG_VERSION: u32 = 17;
|
||||
const SHARD_COUNT: u8 = 2;
|
||||
const ATTACHED_SHARDS: u8 = 4;
|
||||
|
||||
let start_lsn = Lsn::from_str("0/149FD18").unwrap();
|
||||
let env = Env::new(true).unwrap();
|
||||
@@ -725,9 +767,11 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let resident_tli = tli.wal_residence_guard().await.unwrap();
|
||||
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut next_record_lsns = Vec::default();
|
||||
let end_watch =
|
||||
Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, Some(&mut next_record_lsns))
|
||||
.await
|
||||
.unwrap();
|
||||
let end_pos = end_watch.get();
|
||||
|
||||
let streaming_wal_reader = StreamingWalReader::new(
|
||||
@@ -746,38 +790,71 @@ mod tests {
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
|
||||
let mut batch_receivers = vec![rx];
|
||||
struct Sender {
|
||||
tx: Option<tokio::sync::mpsc::Sender<Batch>>,
|
||||
rx: tokio::sync::mpsc::Receiver<Batch>,
|
||||
shard: ShardIdentity,
|
||||
start_lsn: Lsn,
|
||||
received_next_record_lsns: Vec<Lsn>,
|
||||
}
|
||||
|
||||
impl Sender {
|
||||
fn new(start_lsn: Lsn, shard: ShardIdentity) -> Self {
|
||||
let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
|
||||
Self {
|
||||
tx: Some(tx),
|
||||
rx,
|
||||
shard,
|
||||
start_lsn,
|
||||
received_next_record_lsns: Vec::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assert!(next_record_lsns.len() > 7);
|
||||
let start_lsns = vec![
|
||||
next_record_lsns[5],
|
||||
next_record_lsns[1],
|
||||
next_record_lsns[3],
|
||||
];
|
||||
let mut senders = start_lsns
|
||||
.into_iter()
|
||||
.map(|lsn| Sender::new(lsn, shard_0))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let first_sender = senders.first_mut().unwrap();
|
||||
let handle = InterpretedWalReader::spawn(
|
||||
streaming_wal_reader,
|
||||
start_lsn,
|
||||
tx,
|
||||
shard_0,
|
||||
first_sender.start_lsn,
|
||||
first_sender.tx.take().unwrap(),
|
||||
first_sender.shard,
|
||||
PG_VERSION,
|
||||
&Some("pageserver".to_string()),
|
||||
);
|
||||
|
||||
for _ in 0..(ATTACHED_SHARDS - 1) {
|
||||
let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
|
||||
handle.fanout(shard_0, tx, start_lsn).unwrap();
|
||||
batch_receivers.push(rx);
|
||||
for sender in senders.iter_mut().skip(1) {
|
||||
handle
|
||||
.fanout(sender.shard, sender.tx.take().unwrap(), sender.start_lsn)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
loop {
|
||||
let batch = batch_receivers.first_mut().unwrap().recv().await.unwrap();
|
||||
for rx in batch_receivers.iter_mut().skip(1) {
|
||||
let other_batch = rx.recv().await.unwrap();
|
||||
|
||||
assert_eq!(batch.wal_end_lsn, other_batch.wal_end_lsn);
|
||||
assert_eq!(
|
||||
batch.available_wal_end_lsn,
|
||||
other_batch.available_wal_end_lsn
|
||||
for sender in senders.iter_mut() {
|
||||
loop {
|
||||
let batch = sender.rx.recv().await.unwrap();
|
||||
tracing::info!(
|
||||
"Sender with start_lsn={} received batch ending at {} with {} records",
|
||||
sender.start_lsn,
|
||||
batch.wal_end_lsn,
|
||||
batch.records.records.len()
|
||||
);
|
||||
}
|
||||
|
||||
if batch.wal_end_lsn == batch.available_wal_end_lsn {
|
||||
break;
|
||||
for rec in batch.records.records {
|
||||
sender.received_next_record_lsns.push(rec.next_record_lsn);
|
||||
}
|
||||
|
||||
if batch.wal_end_lsn == batch.available_wal_end_lsn {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -792,5 +869,20 @@ mod tests {
|
||||
}
|
||||
|
||||
assert!(done);
|
||||
|
||||
for sender in senders {
|
||||
tracing::info!(
|
||||
"Validating records received by sender with start_lsn={}",
|
||||
sender.start_lsn
|
||||
);
|
||||
|
||||
assert!(sender.received_next_record_lsns.is_sorted());
|
||||
let expected = next_record_lsns
|
||||
.iter()
|
||||
.filter(|lsn| **lsn > sender.start_lsn)
|
||||
.copied()
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(sender.received_next_record_lsns, expected);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -122,6 +122,7 @@ impl Env {
|
||||
start_lsn: Lsn,
|
||||
msg_size: usize,
|
||||
msg_count: usize,
|
||||
mut next_record_lsns: Option<&mut Vec<Lsn>>,
|
||||
) -> anyhow::Result<EndWatch> {
|
||||
let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(receive_wal::MSG_QUEUE_SIZE);
|
||||
let (reply_tx, mut reply_rx) = tokio::sync::mpsc::channel(receive_wal::REPLY_QUEUE_SIZE);
|
||||
@@ -130,7 +131,7 @@ impl Env {
|
||||
|
||||
WalAcceptor::spawn(tli.wal_residence_guard().await?, msg_rx, reply_tx, Some(0));
|
||||
|
||||
let prefix = c"p";
|
||||
let prefix = c"neon-file:";
|
||||
let prefixlen = prefix.to_bytes_with_nul().len();
|
||||
assert!(msg_size >= prefixlen);
|
||||
let message = vec![0; msg_size - prefixlen];
|
||||
@@ -139,6 +140,9 @@ impl Env {
|
||||
&mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), start_lsn);
|
||||
for _ in 0..msg_count {
|
||||
let (lsn, record) = walgen.next().unwrap();
|
||||
if let Some(ref mut lsns) = next_record_lsns {
|
||||
lsns.push(lsn);
|
||||
}
|
||||
|
||||
let req = AppendRequest {
|
||||
h: AppendRequestHeader {
|
||||
|
||||
@@ -246,7 +246,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let resident_tli = tli.wal_residence_guard().await.unwrap();
|
||||
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT)
|
||||
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, None)
|
||||
.await
|
||||
.unwrap();
|
||||
let end_pos = end_watch.get();
|
||||
|
||||
@@ -136,7 +136,7 @@ def run_command_and_log_output(command, log_file_path: Path):
|
||||
"LD_LIBRARY_PATH": f"{os.getenv('PGCOPYDB_LIB_PATH')}:{os.getenv('PG_16_LIB_PATH')}",
|
||||
"PGCOPYDB_SOURCE_PGURI": cast(str, os.getenv("BENCHMARK_INGEST_SOURCE_CONNSTR")),
|
||||
"PGCOPYDB_TARGET_PGURI": cast(str, os.getenv("BENCHMARK_INGEST_TARGET_CONNSTR")),
|
||||
"PGOPTIONS": "-c maintenance_work_mem=8388608 -c max_parallel_maintenance_workers=7",
|
||||
"PGOPTIONS": "-c idle_in_transaction_session_timeout=0 -c maintenance_work_mem=8388608 -c max_parallel_maintenance_workers=7",
|
||||
}
|
||||
# Combine the current environment with custom variables
|
||||
env = os.environ.copy()
|
||||
|
||||
Reference in New Issue
Block a user