mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-02 10:10:37 +00:00
Compare commits
18 Commits
problame/p
...
arpad/less
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e97e86eb43 | ||
|
|
c9370d48de | ||
|
|
e6da7e29ed | ||
|
|
0353a72a00 | ||
|
|
ce4d3da3ae | ||
|
|
5da3e2113a | ||
|
|
6f714c308b | ||
|
|
2e6afaa642 | ||
|
|
8f0a0440ba | ||
|
|
987dc01ed7 | ||
|
|
719e4ad580 | ||
|
|
e61b2a08b3 | ||
|
|
cc89b46ae5 | ||
|
|
d5cbdd2e90 | ||
|
|
6ad9c3560e | ||
|
|
9dc3b09e57 | ||
|
|
fe762e35d8 | ||
|
|
0c4988a92c |
@@ -48,6 +48,10 @@ inputs:
|
||||
description: 'benchmark durations JSON'
|
||||
required: false
|
||||
default: '{}'
|
||||
session_timeout:
|
||||
description: 'Session timeout for the test suite'
|
||||
required: false
|
||||
default: ''
|
||||
|
||||
runs:
|
||||
using: "composite"
|
||||
@@ -107,6 +111,7 @@ runs:
|
||||
ALLOW_FORWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'forward compatibility breakage')
|
||||
RERUN_FLAKY: ${{ inputs.rerun_flaky }}
|
||||
PG_VERSION: ${{ inputs.pg_version }}
|
||||
SESSION_TIMEOUT: ${{ inputs.session_timeout }}
|
||||
shell: bash -euxo pipefail {0}
|
||||
run: |
|
||||
# PLATFORM will be embedded in the perf test report
|
||||
@@ -168,6 +173,10 @@ runs:
|
||||
EXTRA_PARAMS="--durations-path $TEST_OUTPUT/benchmark_durations.json $EXTRA_PARAMS"
|
||||
fi
|
||||
|
||||
if [ -n "${SESSION_TIMEOUT}" ]; then
|
||||
EXTRA_PARAMS="--session-timeout ${SESSION_TIMEOUT} ${EXTRA_PARAMS}"
|
||||
fi
|
||||
|
||||
if [[ "${{ inputs.build_type }}" == "debug" ]]; then
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
|
||||
elif [[ "${{ inputs.build_type }}" == "release" ]]; then
|
||||
|
||||
5
.github/workflows/build_and_test.yml
vendored
5
.github/workflows/build_and_test.yml
vendored
@@ -461,7 +461,8 @@ jobs:
|
||||
|
||||
- name: Pytest regression tests
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
timeout-minutes: 60
|
||||
# Hard timeout to prevent hanging tests, we also have set softer pytest timeout (set via `session_timeout`) which is shorter
|
||||
timeout-minutes: 110
|
||||
with:
|
||||
build_type: ${{ matrix.build_type }}
|
||||
test_selection: regress
|
||||
@@ -471,6 +472,8 @@ jobs:
|
||||
real_s3_region: eu-central-1
|
||||
rerun_flaky: true
|
||||
pg_version: ${{ matrix.pg_version }}
|
||||
# Set pytest session timeout to 25 minutes
|
||||
session_timeout: '1500'
|
||||
env:
|
||||
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
|
||||
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty
|
||||
|
||||
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -3638,7 +3638,6 @@ dependencies = [
|
||||
"arc-swap",
|
||||
"async-compression",
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"camino",
|
||||
@@ -4107,7 +4106,6 @@ name = "postgres_backend"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"futures",
|
||||
"once_cell",
|
||||
|
||||
@@ -837,6 +837,8 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
.copied()
|
||||
.unwrap_or(false);
|
||||
|
||||
let allow_multiple = sub_args.get_flag("allow-multiple");
|
||||
|
||||
let mode = match (lsn, hot_standby) {
|
||||
(Some(lsn), false) => ComputeMode::Static(lsn),
|
||||
(None, true) => ComputeMode::Replica,
|
||||
@@ -854,7 +856,9 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
_ => {}
|
||||
}
|
||||
|
||||
cplane.check_conflicting_endpoints(mode, tenant_id, timeline_id)?;
|
||||
if !allow_multiple {
|
||||
cplane.check_conflicting_endpoints(mode, tenant_id, timeline_id)?;
|
||||
}
|
||||
|
||||
cplane.new_endpoint(
|
||||
&endpoint_id,
|
||||
@@ -883,6 +887,8 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
|
||||
let remote_ext_config = sub_args.get_one::<String>("remote-ext-config");
|
||||
|
||||
let allow_multiple = sub_args.get_flag("allow-multiple");
|
||||
|
||||
// If --safekeepers argument is given, use only the listed safekeeper nodes.
|
||||
let safekeepers =
|
||||
if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
|
||||
@@ -908,11 +914,13 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
|
||||
cplane.check_conflicting_endpoints(
|
||||
endpoint.mode,
|
||||
endpoint.tenant_id,
|
||||
endpoint.timeline_id,
|
||||
)?;
|
||||
if !allow_multiple {
|
||||
cplane.check_conflicting_endpoints(
|
||||
endpoint.mode,
|
||||
endpoint.tenant_id,
|
||||
endpoint.timeline_id,
|
||||
)?;
|
||||
}
|
||||
|
||||
let (pageservers, stripe_size) = if let Some(pageserver_id) = pageserver_id {
|
||||
let conf = env.get_pageserver_conf(pageserver_id).unwrap();
|
||||
@@ -1444,6 +1452,12 @@ fn cli() -> Command {
|
||||
.help("If set, will create test user `user` and `neondb` database. Requires `update-catalog = true`")
|
||||
.required(false);
|
||||
|
||||
let allow_multiple = Arg::new("allow-multiple")
|
||||
.help("Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but useful for tests.")
|
||||
.long("allow-multiple")
|
||||
.action(ArgAction::SetTrue)
|
||||
.required(false);
|
||||
|
||||
Command::new("Neon CLI")
|
||||
.arg_required_else_help(true)
|
||||
.version(GIT_VERSION)
|
||||
@@ -1601,6 +1615,7 @@ fn cli() -> Command {
|
||||
.arg(pg_version_arg.clone())
|
||||
.arg(hot_standby_arg.clone())
|
||||
.arg(update_catalog)
|
||||
.arg(allow_multiple.clone())
|
||||
)
|
||||
.subcommand(Command::new("start")
|
||||
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
|
||||
@@ -1609,6 +1624,7 @@ fn cli() -> Command {
|
||||
.arg(safekeepers_arg)
|
||||
.arg(remote_ext_config_args)
|
||||
.arg(create_test_user)
|
||||
.arg(allow_multiple.clone())
|
||||
)
|
||||
.subcommand(Command::new("reconfigure")
|
||||
.about("Reconfigure the endpoint")
|
||||
|
||||
@@ -5,7 +5,6 @@ edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
async-trait.workspace = true
|
||||
anyhow.workspace = true
|
||||
bytes.workspace = true
|
||||
futures.workspace = true
|
||||
|
||||
@@ -78,17 +78,16 @@ pub fn is_expected_io_error(e: &io::Error) -> bool {
|
||||
)
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Handler<IO> {
|
||||
/// Handle single query.
|
||||
/// postgres_backend will issue ReadyForQuery after calling this (this
|
||||
/// might be not what we want after CopyData streaming, but currently we don't
|
||||
/// care). It will also flush out the output buffer.
|
||||
async fn process_query(
|
||||
fn process_query(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
query_string: &str,
|
||||
) -> Result<(), QueryError>;
|
||||
) -> impl Future<Output = Result<(), QueryError>> + Send;
|
||||
|
||||
/// Called on startup packet receival, allows to process params.
|
||||
///
|
||||
|
||||
@@ -22,7 +22,6 @@ async fn make_tcp_pair() -> (TcpStream, TcpStream) {
|
||||
|
||||
struct TestHandler {}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> Handler<IO> for TestHandler {
|
||||
// return single col 'hey' for any query
|
||||
async fn process_query(
|
||||
|
||||
@@ -331,7 +331,10 @@ impl CheckPoint {
|
||||
/// Returns 'true' if the XID was updated.
|
||||
pub fn update_next_xid(&mut self, xid: u32) -> bool {
|
||||
// nextXid should be greater than any XID in WAL, so increment provided XID and check for wraparround.
|
||||
let mut new_xid = std::cmp::max(xid.wrapping_add(1), pg_constants::FIRST_NORMAL_TRANSACTION_ID);
|
||||
let mut new_xid = std::cmp::max(
|
||||
xid.wrapping_add(1),
|
||||
pg_constants::FIRST_NORMAL_TRANSACTION_ID,
|
||||
);
|
||||
// To reduce number of metadata checkpoints, we forward align XID on XID_CHECKPOINT_INTERVAL.
|
||||
// XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
|
||||
new_xid =
|
||||
@@ -367,8 +370,16 @@ pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Byte
|
||||
let seg_off = lsn.segment_offset(WAL_SEGMENT_SIZE);
|
||||
|
||||
let first_page_only = seg_off < XLOG_BLCKSZ;
|
||||
let (shdr_rem_len, infoflags) = if first_page_only {
|
||||
(seg_off, pg_constants::XLP_FIRST_IS_CONTRECORD)
|
||||
// If first records starts in the middle of the page, pretend in page header
|
||||
// there is a fake record which ends where first real record starts. This
|
||||
// makes pg_waldump etc happy.
|
||||
let (shdr_rem_len, infoflags) = if first_page_only && seg_off > 0 {
|
||||
assert!(seg_off >= XLOG_SIZE_OF_XLOG_LONG_PHD);
|
||||
// xlp_rem_len doesn't include page header, hence the subtraction.
|
||||
(
|
||||
seg_off - XLOG_SIZE_OF_XLOG_LONG_PHD,
|
||||
pg_constants::XLP_FIRST_IS_CONTRECORD,
|
||||
)
|
||||
} else {
|
||||
(0, 0)
|
||||
};
|
||||
@@ -397,20 +408,22 @@ pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Byte
|
||||
|
||||
if !first_page_only {
|
||||
let block_offset = lsn.page_offset_in_segment(WAL_SEGMENT_SIZE) as usize;
|
||||
// see comments above about XLP_FIRST_IS_CONTRECORD and xlp_rem_len.
|
||||
let (xlp_rem_len, xlp_info) = if page_off > 0 {
|
||||
assert!(page_off >= XLOG_SIZE_OF_XLOG_SHORT_PHD as u64);
|
||||
(
|
||||
(page_off - XLOG_SIZE_OF_XLOG_SHORT_PHD as u64) as u32,
|
||||
pg_constants::XLP_FIRST_IS_CONTRECORD,
|
||||
)
|
||||
} else {
|
||||
(0, 0)
|
||||
};
|
||||
let header = XLogPageHeaderData {
|
||||
xlp_magic: XLOG_PAGE_MAGIC as u16,
|
||||
xlp_info: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 {
|
||||
pg_constants::XLP_FIRST_IS_CONTRECORD
|
||||
} else {
|
||||
0
|
||||
},
|
||||
xlp_info,
|
||||
xlp_tli: PG_TLI,
|
||||
xlp_pageaddr: lsn.page_lsn().0,
|
||||
xlp_rem_len: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 {
|
||||
page_off as u32
|
||||
} else {
|
||||
0u32
|
||||
},
|
||||
xlp_rem_len,
|
||||
..Default::default() // Put 0 in padding fields.
|
||||
};
|
||||
let hdr_bytes = header.encode()?;
|
||||
|
||||
@@ -15,7 +15,6 @@ anyhow.workspace = true
|
||||
arc-swap.workspace = true
|
||||
async-compression.workspace = true
|
||||
async-stream.workspace = true
|
||||
async-trait.workspace = true
|
||||
byteorder.workspace = true
|
||||
bytes.workspace = true
|
||||
camino.workspace = true
|
||||
|
||||
@@ -1384,7 +1384,6 @@ impl PageServerHandler {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<IO> postgres_backend::Handler<IO> for PageServerHandler
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
|
||||
3000
poetry.lock
generated
3000
poetry.lock
generated
File diff suppressed because one or more lines are too long
@@ -75,7 +75,6 @@ pub type ComputeReady = DatabaseInfo;
|
||||
|
||||
// TODO: replace with an http-based protocol.
|
||||
struct MgmtHandler;
|
||||
#[async_trait::async_trait]
|
||||
impl postgres_backend::Handler<tokio::net::TcpStream> for MgmtHandler {
|
||||
async fn process_query(
|
||||
&mut self,
|
||||
|
||||
@@ -23,7 +23,7 @@ moto = {extras = ["server"], version = "^4.1.2"}
|
||||
backoff = "^2.2.1"
|
||||
pytest-lazy-fixture = "^0.6.3"
|
||||
prometheus-client = "^0.14.1"
|
||||
pytest-timeout = "^2.1.0"
|
||||
pytest-timeout = "^2.3.1"
|
||||
Werkzeug = "^3.0.1"
|
||||
pytest-order = "^1.1.0"
|
||||
allure-pytest = "^2.13.2"
|
||||
|
||||
@@ -95,7 +95,6 @@ fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
for SafekeeperPostgresHandler
|
||||
{
|
||||
|
||||
@@ -725,6 +725,18 @@ where
|
||||
self.state.inmem.commit_lsn
|
||||
);
|
||||
|
||||
// Before first WAL write initialize its segment. It makes first segment
|
||||
// pg_waldump'able because stream from compute doesn't include its
|
||||
// segment and page headers.
|
||||
//
|
||||
// If we fail before first WAL write flush this action would be
|
||||
// repeated, that's ok because it is idempotent.
|
||||
if self.wal_store.flush_lsn() == Lsn::INVALID {
|
||||
self.wal_store
|
||||
.initialize_first_segment(msg.start_streaming_at)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// TODO: cross check divergence point, check if msg.start_streaming_at corresponds to
|
||||
// intersection of our history and history from msg
|
||||
|
||||
@@ -1007,6 +1019,10 @@ mod tests {
|
||||
self.lsn
|
||||
}
|
||||
|
||||
async fn initialize_first_segment(&mut self, _init_lsn: Lsn) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
|
||||
self.lsn = startpos + buf.len() as u64;
|
||||
Ok(())
|
||||
|
||||
@@ -38,6 +38,12 @@ pub trait Storage {
|
||||
/// LSN of last durably stored WAL record.
|
||||
fn flush_lsn(&self) -> Lsn;
|
||||
|
||||
/// Initialize segment by creating proper long header at the beginning of
|
||||
/// the segment and short header at the page of given LSN. This is only used
|
||||
/// for timeline initialization because compute will stream data only since
|
||||
/// init_lsn. Other segment headers are included in compute stream.
|
||||
async fn initialize_first_segment(&mut self, init_lsn: Lsn) -> Result<()>;
|
||||
|
||||
/// Write piece of WAL from buf to disk, but not necessarily sync it.
|
||||
async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>;
|
||||
|
||||
@@ -78,6 +84,8 @@ pub struct PhysicalStorage {
|
||||
|
||||
/// Size of WAL segment in bytes.
|
||||
wal_seg_size: usize,
|
||||
pg_version: u32,
|
||||
system_id: u64,
|
||||
|
||||
/// Written to disk, but possibly still in the cache and not fully persisted.
|
||||
/// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
|
||||
@@ -169,6 +177,8 @@ impl PhysicalStorage {
|
||||
timeline_dir,
|
||||
conf: conf.clone(),
|
||||
wal_seg_size,
|
||||
pg_version: state.server.pg_version,
|
||||
system_id: state.server.system_id,
|
||||
write_lsn,
|
||||
write_record_lsn: write_lsn,
|
||||
flush_record_lsn: flush_lsn,
|
||||
@@ -324,6 +334,20 @@ impl Storage for PhysicalStorage {
|
||||
self.flush_record_lsn
|
||||
}
|
||||
|
||||
async fn initialize_first_segment(&mut self, init_lsn: Lsn) -> Result<()> {
|
||||
let segno = init_lsn.segment_number(self.wal_seg_size);
|
||||
let (mut file, _) = self.open_or_create(segno).await?;
|
||||
let major_pg_version = self.pg_version / 10000;
|
||||
let wal_seg =
|
||||
postgres_ffi::generate_wal_segment(segno, self.system_id, major_pg_version, init_lsn)?;
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
file.write_all(&wal_seg).await?;
|
||||
file.flush().await?;
|
||||
info!("initialized segno {} at lsn {}", segno, init_lsn);
|
||||
// note: file is *not* fsynced
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Write WAL to disk.
|
||||
async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
|
||||
// Disallow any non-sequential writes, which can result in gaps or overwrites.
|
||||
|
||||
@@ -182,6 +182,10 @@ impl wal_storage::Storage for DiskWALStorage {
|
||||
self.flush_record_lsn
|
||||
}
|
||||
|
||||
async fn initialize_first_segment(&mut self, _init_lsn: Lsn) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Write piece of WAL from buf to disk, but not necessarily sync it.
|
||||
async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
|
||||
if self.write_lsn != startpos {
|
||||
|
||||
@@ -1801,6 +1801,7 @@ class NeonCli(AbstractNeonCli):
|
||||
hot_standby: bool = False,
|
||||
lsn: Optional[Lsn] = None,
|
||||
pageserver_id: Optional[int] = None,
|
||||
allow_multiple=False,
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
args = [
|
||||
"endpoint",
|
||||
@@ -1824,6 +1825,8 @@ class NeonCli(AbstractNeonCli):
|
||||
args.extend(["--hot-standby", "true"])
|
||||
if pageserver_id is not None:
|
||||
args.extend(["--pageserver-id", str(pageserver_id)])
|
||||
if allow_multiple:
|
||||
args.extend(["--allow-multiple"])
|
||||
|
||||
res = self.raw_cli(args)
|
||||
res.check_returncode()
|
||||
@@ -1835,6 +1838,7 @@ class NeonCli(AbstractNeonCli):
|
||||
safekeepers: Optional[List[int]] = None,
|
||||
remote_ext_config: Optional[str] = None,
|
||||
pageserver_id: Optional[int] = None,
|
||||
allow_multiple=False,
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
args = [
|
||||
"endpoint",
|
||||
@@ -1849,6 +1853,8 @@ class NeonCli(AbstractNeonCli):
|
||||
args.append(endpoint_id)
|
||||
if pageserver_id is not None:
|
||||
args.extend(["--pageserver-id", str(pageserver_id)])
|
||||
if allow_multiple:
|
||||
args.extend(["--allow-multiple"])
|
||||
|
||||
res = self.raw_cli(args)
|
||||
res.check_returncode()
|
||||
@@ -3299,6 +3305,7 @@ class Endpoint(PgProtocol):
|
||||
lsn: Optional[Lsn] = None,
|
||||
config_lines: Optional[List[str]] = None,
|
||||
pageserver_id: Optional[int] = None,
|
||||
allow_multiple: bool = False,
|
||||
) -> "Endpoint":
|
||||
"""
|
||||
Create a new Postgres endpoint.
|
||||
@@ -3321,6 +3328,7 @@ class Endpoint(PgProtocol):
|
||||
pg_port=self.pg_port,
|
||||
http_port=self.http_port,
|
||||
pageserver_id=pageserver_id,
|
||||
allow_multiple=allow_multiple,
|
||||
)
|
||||
path = Path("endpoints") / self.endpoint_id / "pgdata"
|
||||
self.pgdata_dir = os.path.join(self.env.repo_dir, path)
|
||||
@@ -3337,7 +3345,10 @@ class Endpoint(PgProtocol):
|
||||
return self
|
||||
|
||||
def start(
|
||||
self, remote_ext_config: Optional[str] = None, pageserver_id: Optional[int] = None
|
||||
self,
|
||||
remote_ext_config: Optional[str] = None,
|
||||
pageserver_id: Optional[int] = None,
|
||||
allow_multiple: bool = False,
|
||||
) -> "Endpoint":
|
||||
"""
|
||||
Start the Postgres instance.
|
||||
@@ -3353,6 +3364,7 @@ class Endpoint(PgProtocol):
|
||||
safekeepers=self.active_safekeepers,
|
||||
remote_ext_config=remote_ext_config,
|
||||
pageserver_id=pageserver_id,
|
||||
allow_multiple=allow_multiple,
|
||||
)
|
||||
self.running = True
|
||||
|
||||
@@ -3482,6 +3494,7 @@ class Endpoint(PgProtocol):
|
||||
config_lines: Optional[List[str]] = None,
|
||||
remote_ext_config: Optional[str] = None,
|
||||
pageserver_id: Optional[int] = None,
|
||||
allow_multiple=False,
|
||||
) -> "Endpoint":
|
||||
"""
|
||||
Create an endpoint, apply config, and start Postgres.
|
||||
@@ -3497,7 +3510,12 @@ class Endpoint(PgProtocol):
|
||||
hot_standby=hot_standby,
|
||||
lsn=lsn,
|
||||
pageserver_id=pageserver_id,
|
||||
).start(remote_ext_config=remote_ext_config, pageserver_id=pageserver_id)
|
||||
allow_multiple=allow_multiple,
|
||||
).start(
|
||||
remote_ext_config=remote_ext_config,
|
||||
pageserver_id=pageserver_id,
|
||||
allow_multiple=allow_multiple,
|
||||
)
|
||||
|
||||
log.info(f"Postgres startup took {time.time() - started_at} seconds")
|
||||
|
||||
|
||||
@@ -19,6 +19,12 @@ from fixtures.neon_fixtures import NeonEnvBuilder, WalCraft
|
||||
def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str):
|
||||
env = neon_env_builder.init_start()
|
||||
env.neon_cli.create_branch("test_crafted_wal_end")
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
# seems like pageserver stop triggers these
|
||||
".*initial size calculation failed.*Bad state (not active).*",
|
||||
]
|
||||
)
|
||||
|
||||
endpoint = env.endpoints.create("test_crafted_wal_end")
|
||||
wal_craft = WalCraft(env)
|
||||
|
||||
@@ -1,13 +1,28 @@
|
||||
import os
|
||||
import shutil
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv, PgBin
|
||||
from fixtures.utils import subprocess_capture
|
||||
|
||||
|
||||
def check_wal_segment(pg_waldump_path: str, segment_path: str, test_output_dir):
|
||||
# use special --ignore option to ignore the validation checks in pg_waldump
|
||||
# this is necessary, because neon WAL files contain gap at the beginning
|
||||
output_path, _, _ = subprocess_capture(
|
||||
test_output_dir, [pg_waldump_path, "--ignore", segment_path]
|
||||
)
|
||||
|
||||
with open(f"{output_path}.stdout", "r") as f:
|
||||
stdout = f.read()
|
||||
assert "ABORT" in stdout
|
||||
assert "COMMIT" in stdout
|
||||
|
||||
|
||||
# Simple test to check that pg_waldump works with neon WAL files
|
||||
def test_pg_waldump(neon_simple_env: NeonEnv, test_output_dir, pg_bin: PgBin):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_pg_waldump", "empty")
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.neon_cli.create_branch("test_pg_waldump", "empty")
|
||||
endpoint = env.endpoints.create_start("test_pg_waldump")
|
||||
|
||||
cur = endpoint.connect().cursor()
|
||||
@@ -35,12 +50,12 @@ def test_pg_waldump(neon_simple_env: NeonEnv, test_output_dir, pg_bin: PgBin):
|
||||
assert endpoint.pgdata_dir
|
||||
wal_path = os.path.join(endpoint.pgdata_dir, "pg_wal/000000010000000000000001")
|
||||
pg_waldump_path = os.path.join(pg_bin.pg_bin_path, "pg_waldump")
|
||||
# check segment on compute
|
||||
check_wal_segment(pg_waldump_path, wal_path, test_output_dir)
|
||||
|
||||
# use special --ignore option to ignore the validation checks in pg_waldump
|
||||
# this is necessary, because neon WAL files contain gap at the beginning
|
||||
output_path, _, _ = subprocess_capture(test_output_dir, [pg_waldump_path, "--ignore", wal_path])
|
||||
|
||||
with open(f"{output_path}.stdout", "r") as f:
|
||||
stdout = f.read()
|
||||
assert "ABORT" in stdout
|
||||
assert "COMMIT" in stdout
|
||||
# Check file on safekeepers as well. pg_waldump is strict about file naming, so remove .partial suffix.
|
||||
sk = env.safekeepers[0]
|
||||
sk_tli_dir = sk.timeline_dir(tenant_id, timeline_id)
|
||||
non_partial_path = os.path.join(sk_tli_dir, "000000010000000000000001")
|
||||
shutil.copyfile(os.path.join(sk_tli_dir, "000000010000000000000001.partial"), non_partial_path)
|
||||
check_wal_segment(pg_waldump_path, non_partial_path, test_output_dir)
|
||||
|
||||
@@ -254,7 +254,9 @@ def test_restarts_frequent_checkpoints(neon_env_builder: NeonEnvBuilder):
|
||||
)
|
||||
|
||||
|
||||
def endpoint_create_start(env: NeonEnv, branch: str, pgdir_name: Optional[str]):
|
||||
def endpoint_create_start(
|
||||
env: NeonEnv, branch: str, pgdir_name: Optional[str], allow_multiple: bool = False
|
||||
):
|
||||
endpoint = Endpoint(
|
||||
env,
|
||||
tenant_id=env.initial_tenant,
|
||||
@@ -268,14 +270,23 @@ def endpoint_create_start(env: NeonEnv, branch: str, pgdir_name: Optional[str]):
|
||||
# embed current time in endpoint ID
|
||||
endpoint_id = pgdir_name or f"ep-{time.time()}"
|
||||
return endpoint.create_start(
|
||||
branch_name=branch, endpoint_id=endpoint_id, config_lines=["log_statement=all"]
|
||||
branch_name=branch,
|
||||
endpoint_id=endpoint_id,
|
||||
config_lines=["log_statement=all"],
|
||||
allow_multiple=allow_multiple,
|
||||
)
|
||||
|
||||
|
||||
async def exec_compute_query(
|
||||
env: NeonEnv, branch: str, query: str, pgdir_name: Optional[str] = None
|
||||
env: NeonEnv,
|
||||
branch: str,
|
||||
query: str,
|
||||
pgdir_name: Optional[str] = None,
|
||||
allow_multiple: bool = False,
|
||||
):
|
||||
with endpoint_create_start(env, branch=branch, pgdir_name=pgdir_name) as endpoint:
|
||||
with endpoint_create_start(
|
||||
env, branch=branch, pgdir_name=pgdir_name, allow_multiple=allow_multiple
|
||||
) as endpoint:
|
||||
before_conn = time.time()
|
||||
conn = await endpoint.connect_async()
|
||||
res = await conn.fetch(query)
|
||||
@@ -347,6 +358,7 @@ class BackgroundCompute(object):
|
||||
self.branch,
|
||||
f"INSERT INTO query_log(index, verify_key) VALUES ({self.index}, {verify_key}) RETURNING verify_key",
|
||||
pgdir_name=f"bgcompute{self.index}_key{verify_key}",
|
||||
allow_multiple=True,
|
||||
)
|
||||
log.info(f"result: {res}")
|
||||
if len(res) != 1:
|
||||
|
||||
Reference in New Issue
Block a user