mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-05 19:50:36 +00:00
Compare commits
2 Commits
heikki/deb
...
erik/segme
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6ad0739080 | ||
|
|
6b19867410 |
16
.github/workflows/build_and_test.yml
vendored
16
.github/workflows/build_and_test.yml
vendored
@@ -133,38 +133,38 @@ jobs:
|
||||
|
||||
- name: Check vendor/postgres-v14 submodule reference
|
||||
if: steps.check-if-submodules-changed.outputs.vendor == 'true'
|
||||
uses: hlinnaka/submodule-branch-check-action@main
|
||||
uses: jtmullen/submodule-branch-check-action@v1
|
||||
with:
|
||||
path: "vendor/postgres-v14"
|
||||
fetch_depth: "50"
|
||||
sub_fetch_depth: ""
|
||||
sub_fetch_depth: "50"
|
||||
pass_if_unchanged: true
|
||||
|
||||
- name: Check vendor/postgres-v15 submodule reference
|
||||
if: steps.check-if-submodules-changed.outputs.vendor == 'true'
|
||||
uses: hlinnaka/submodule-branch-check-action@main
|
||||
uses: jtmullen/submodule-branch-check-action@v1
|
||||
with:
|
||||
path: "vendor/postgres-v15"
|
||||
fetch_depth: "50"
|
||||
sub_fetch_depth: ""
|
||||
sub_fetch_depth: "50"
|
||||
pass_if_unchanged: true
|
||||
|
||||
- name: Check vendor/postgres-v16 submodule reference
|
||||
if: steps.check-if-submodules-changed.outputs.vendor == 'true'
|
||||
uses: hlinnaka/submodule-branch-check-action@main
|
||||
uses: jtmullen/submodule-branch-check-action@v1
|
||||
with:
|
||||
path: "vendor/postgres-v16"
|
||||
fetch_depth: "50"
|
||||
sub_fetch_depth: ""
|
||||
sub_fetch_depth: "50"
|
||||
pass_if_unchanged: true
|
||||
|
||||
- name: Check vendor/postgres-v17 submodule reference
|
||||
if: steps.check-if-submodules-changed.outputs.vendor == 'true'
|
||||
uses: hlinnaka/submodule-branch-check-action@main
|
||||
uses: jtmullen/submodule-branch-check-action@v1
|
||||
with:
|
||||
path: "vendor/postgres-v17"
|
||||
fetch_depth: "50"
|
||||
sub_fetch_depth: ""
|
||||
sub_fetch_depth: "50"
|
||||
pass_if_unchanged: true
|
||||
|
||||
check-codestyle-rust:
|
||||
|
||||
@@ -241,7 +241,7 @@ pub use v14::bindings::{CheckPoint, ControlFileData};
|
||||
pub const BLCKSZ: u16 = 8192;
|
||||
pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32);
|
||||
pub const XLOG_BLCKSZ: usize = 8192;
|
||||
pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024;
|
||||
pub const WAL_SEGMENT_SIZE: usize = 128 * 1024 * 1024;
|
||||
|
||||
pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16;
|
||||
|
||||
|
||||
@@ -151,7 +151,7 @@ fn check_end_of_wal(
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
const_assert!(WAL_SEGMENT_SIZE == 16 * 1024 * 1024);
|
||||
const_assert!(WAL_SEGMENT_SIZE == 128 * 1024 * 1024);
|
||||
|
||||
#[test]
|
||||
pub fn test_find_end_of_wal_simple() {
|
||||
|
||||
@@ -138,6 +138,11 @@ impl Lsn {
|
||||
self.0.checked_sub(other).map(Lsn)
|
||||
}
|
||||
|
||||
/// Subtract a number, saturating at numeric bounds instead of overflowing.
|
||||
pub fn saturating_sub<T: Into<u64>>(self, other: T) -> Lsn {
|
||||
Lsn(self.0.saturating_sub(other.into()))
|
||||
}
|
||||
|
||||
/// Subtract a number, returning the difference as i128 to avoid overflow.
|
||||
pub fn widening_sub<T: Into<u64>>(self, other: T) -> i128 {
|
||||
let other: u64 = other.into();
|
||||
|
||||
@@ -979,7 +979,8 @@ where
|
||||
self.wal_store.flush_wal().await?;
|
||||
}
|
||||
|
||||
// Update commit_lsn.
|
||||
// Update commit_lsn. It will be flushed to the control file regularly by the timeline
|
||||
// manager, off of the WAL ingest hot path.
|
||||
if msg.h.commit_lsn != Lsn(0) {
|
||||
self.update_commit_lsn(msg.h.commit_lsn).await?;
|
||||
}
|
||||
@@ -992,15 +993,6 @@ where
|
||||
self.state.inmem.peer_horizon_lsn =
|
||||
max(self.state.inmem.peer_horizon_lsn, msg.h.truncate_lsn);
|
||||
|
||||
// Update truncate and commit LSN in control file.
|
||||
// To avoid negative impact on performance of extra fsync, do it only
|
||||
// when commit_lsn delta exceeds WAL segment size.
|
||||
if self.state.commit_lsn + (self.state.server.wal_seg_size as u64)
|
||||
< self.state.inmem.commit_lsn
|
||||
{
|
||||
self.state.flush().await?;
|
||||
}
|
||||
|
||||
trace!(
|
||||
"processed AppendRequest of len {}, begin_lsn={}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, flushed={:?}",
|
||||
msg.wal_data.len(),
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
use std::{cmp::max, ops::Deref};
|
||||
|
||||
use anyhow::{bail, Result};
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use safekeeper_api::models::TimelineTermBumpResponse;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::{
|
||||
@@ -144,7 +145,7 @@ impl TimelinePersistentState {
|
||||
ServerInfo {
|
||||
pg_version: 170000, /* Postgres server version (major * 10000) */
|
||||
system_id: 0, /* Postgres system identifier */
|
||||
wal_seg_size: 16 * 1024 * 1024,
|
||||
wal_seg_size: WAL_SEGMENT_SIZE as u32,
|
||||
},
|
||||
vec![],
|
||||
Lsn::INVALID,
|
||||
|
||||
@@ -515,7 +515,12 @@ impl Manager {
|
||||
return;
|
||||
}
|
||||
|
||||
if state.cfile_last_persist_at.elapsed() > self.conf.control_file_save_interval {
|
||||
if state.cfile_last_persist_at.elapsed() > self.conf.control_file_save_interval
|
||||
// If the control file's commit_lsn lags more than one segment behind the current
|
||||
// commit_lsn, flush immediately to limit recovery time in case of a crash. We don't do
|
||||
// this on the WAL ingest hot path since it incurs fsync latency.
|
||||
|| state.commit_lsn.saturating_sub(state.cfile_commit_lsn).0 >= self.wal_seg_size as u64
|
||||
{
|
||||
let mut write_guard = self.tli.write_shared_state().await;
|
||||
// it should be done in the background because it blocks manager task, but flush() should
|
||||
// be fast enough not to be a problem now
|
||||
|
||||
@@ -5,8 +5,7 @@ import time
|
||||
from fixtures.neon_fixtures import NeonEnv, logical_replication_sync
|
||||
|
||||
|
||||
def test_physical_and_logical_replication_slot_not_copied(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
"""Test read replica of a primary which has a logical replication publication"""
|
||||
def test_physical_and_logical_replication(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
env = neon_simple_env
|
||||
|
||||
n_records = 100000
|
||||
@@ -14,6 +13,7 @@ def test_physical_and_logical_replication_slot_not_copied(neon_simple_env: NeonE
|
||||
primary = env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
endpoint_id="primary",
|
||||
config_lines=["min_wal_size=32MB", "max_wal_size=64MB"],
|
||||
)
|
||||
p_con = primary.connect()
|
||||
p_cur = p_con.cursor()
|
||||
@@ -30,6 +30,7 @@ def test_physical_and_logical_replication_slot_not_copied(neon_simple_env: NeonE
|
||||
secondary = env.endpoints.new_replica_start(
|
||||
origin=primary,
|
||||
endpoint_id="secondary",
|
||||
config_lines=["min_wal_size=32MB", "max_wal_size=64MB"],
|
||||
)
|
||||
|
||||
s_con = secondary.connect()
|
||||
@@ -47,51 +48,3 @@ def test_physical_and_logical_replication_slot_not_copied(neon_simple_env: NeonE
|
||||
# Check that LR slot is not copied to replica
|
||||
s_cur.execute("select count(*) from pg_replication_slots")
|
||||
assert s_cur.fetchall()[0][0] == 0
|
||||
|
||||
|
||||
def test_aux_not_logged_at_replica(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
"""Test that AUX files are not saved at replica"""
|
||||
env = neon_simple_env
|
||||
|
||||
n_records = 200000
|
||||
|
||||
primary = env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
endpoint_id="primary",
|
||||
)
|
||||
p_con = primary.connect()
|
||||
p_cur = p_con.cursor()
|
||||
p_cur.execute("CREATE TABLE t(pk bigint primary key, payload text default repeat('?',200))")
|
||||
p_cur.execute("create publication pub1 for table t")
|
||||
|
||||
# start subscriber
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("CREATE TABLE t(pk bigint primary key, payload text)")
|
||||
connstr = primary.connstr().replace("'", "''")
|
||||
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
|
||||
|
||||
for pk in range(n_records):
|
||||
p_cur.execute("insert into t (pk) values (%s)", (pk,))
|
||||
|
||||
# LR snapshot is stored each 15 seconds
|
||||
time.sleep(16)
|
||||
|
||||
# start replica
|
||||
secondary = env.endpoints.new_replica_start(
|
||||
origin=primary,
|
||||
endpoint_id="secondary",
|
||||
)
|
||||
|
||||
s_con = secondary.connect()
|
||||
s_cur = s_con.cursor()
|
||||
|
||||
logical_replication_sync(vanilla_pg, primary)
|
||||
|
||||
assert vanilla_pg.safe_psql("select count(*) from t")[0][0] == n_records
|
||||
s_cur.execute("select count(*) from t")
|
||||
assert s_cur.fetchall()[0][0] == n_records
|
||||
|
||||
primary.stop()
|
||||
time.sleep(1)
|
||||
secondary.stop()
|
||||
assert not secondary.log_contains("cannot make new WAL entries during recovery")
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: de0a000daf...2199b83fb7
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: fd631a9590...22e580fe9f
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 03b43900ed...e131a9c027
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: ae4cc30dba...9ad2f3c5c3
8
vendor/revisions.json
vendored
8
vendor/revisions.json
vendored
@@ -1,18 +1,18 @@
|
||||
{
|
||||
"v17": [
|
||||
"17.0",
|
||||
"ae4cc30dba24f3910533e5a48e8103c3f2fff300"
|
||||
"9ad2f3c5c37c08069a01c1e3f6b7cf275437e0cb"
|
||||
],
|
||||
"v16": [
|
||||
"16.4",
|
||||
"03b43900edc5d8d6eecec460bfc89aec7174bd84"
|
||||
"e131a9c027b202ce92bd7b9cf2569d48a6f9948e"
|
||||
],
|
||||
"v15": [
|
||||
"15.8",
|
||||
"fd631a959049dfe2b82f67409c8b8b0d3e0016d1"
|
||||
"22e580fe9ffcea7e02592110b1c9bf426d83cada"
|
||||
],
|
||||
"v14": [
|
||||
"14.13",
|
||||
"de0a000dafc2e66ce2e39282d3aa1c704fe0390e"
|
||||
"2199b83fb72680001ce0f43bf6187a21dfb8f45d"
|
||||
]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user