Compare commits

...

3 Commits

Author SHA1 Message Date
Christian Schwarz
b144f22be4 [DNM] matrix run test suite with legacy & tiered compaction 2024-05-28 16:29:05 +00:00
Arseny Sher
4a0ce9512b Add safekeeper test truncating WAL.
We do it as a part of more complicated tests like test_compute_restarts, but
let's have a simple test as well.
2024-05-28 11:08:29 +03:00
Konstantin Knizhnik
d61e924103 Fix connect to PS on MacOS/X (#7885)
## Problem

After [0e4f182680] which introduce async
connect
Neon is not able to connect to page server.

## Summary of changes

Perform sync commit at MacOS/X

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-05-27 15:57:57 +03:00
8 changed files with 141 additions and 56 deletions

View File

@@ -432,8 +432,9 @@ jobs:
strategy:
fail-fast: false
matrix:
build_type: [ debug, release ]
pg_version: [ v14, v15, v16 ]
build_type: [ debug ]
pg_version: [ v15 ]
pageserver_compaction_algorithm_kind: [ "legacy", "tiered" ]
steps:
- name: Checkout
uses: actions/checkout@v4
@@ -461,6 +462,9 @@ jobs:
PAGESERVER_GET_VECTORED_IMPL: vectored
PAGESERVER_GET_IMPL: vectored
PAGESERVER_VALIDATE_VEC_GET: true
PAGESERVER_DEFAULT_TENANT_CONFIG_COMPACTION_ALGORITHM: 'kind="${{ matrix.pageserver_compaction_algorithm_kind }}"'
# catch the tests that override `tenant_config` as a whole without specifying the compaction algorithm `kind`
NEON_PAGESERVER_PANIC_ON_UNSPECIFIED_COMPACTION_ALGORITHM: true
# Temporary disable this step until we figure out why it's so flaky
# Ref https://github.com/neondatabase/neon/issues/4540

View File

@@ -451,6 +451,8 @@ impl EvictionPolicy {
)]
#[strum(serialize_all = "kebab-case")]
pub enum CompactionAlgorithm {
#[strum(disabled)]
NotSpecified,
Legacy,
Tiered,
}

View File

@@ -5,7 +5,7 @@
//! See also `settings.md` for better description on every parameter.
use anyhow::{anyhow, bail, ensure, Context, Result};
use pageserver_api::shard::TenantShardId;
use pageserver_api::{models::CompactionAlgorithm, shard::TenantShardId};
use remote_storage::{RemotePath, RemoteStorageConfig};
use serde;
use serde::de::IntoDeserializer;
@@ -15,7 +15,7 @@ use utils::crashsafe::path_with_suffix_extension;
use utils::id::ConnectionId;
use utils::logging::SecretString;
use once_cell::sync::OnceCell;
use once_cell::sync::{Lazy, OnceCell};
use reqwest::Url;
use std::num::NonZeroUsize;
use std::str::FromStr;
@@ -1067,6 +1067,19 @@ impl PageServerConf {
conf.default_tenant_conf = t_conf.merge(TenantConf::default());
{
const VAR_NAME: &str = "NEON_PAGESERVER_PANIC_ON_UNSPECIFIED_COMPACTION_ALGORITHM";
static VAR: Lazy<Option<bool>> = Lazy::new(|| utils::env::var(VAR_NAME));
if VAR.unwrap_or(false)
&& conf.default_tenant_conf.compaction_algorithm.kind
== CompactionAlgorithm::NotSpecified
{
panic!(
"Unspecified compaction algorithm in default tenant configuration. \
Set the algorithm explicitly in the pageserver.toml's `tenant_config` field or unset the environment variable {VAR_NAME}");
}
}
Ok(conf)
}

View File

@@ -40,8 +40,6 @@ pub mod defaults {
pub const DEFAULT_COMPACTION_PERIOD: &str = "20 s";
pub const DEFAULT_COMPACTION_THRESHOLD: usize = 10;
pub const DEFAULT_COMPACTION_ALGORITHM: super::CompactionAlgorithm =
super::CompactionAlgorithm::Legacy;
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
@@ -554,7 +552,13 @@ impl Default for TenantConf {
.expect("cannot parse default compaction period"),
compaction_threshold: DEFAULT_COMPACTION_THRESHOLD,
compaction_algorithm: CompactionAlgorithmSettings {
kind: DEFAULT_COMPACTION_ALGORITHM,
kind: if cfg!(test) {
// Rust tests rely on a valid implicit default (TODO: fix this)
CompactionAlgorithm::Legacy
} else {
// Python tests are subject to NotSpecified handling
CompactionAlgorithm::NotSpecified
},
},
gc_horizon: DEFAULT_GC_HORIZON,
gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)

View File

@@ -1700,6 +1700,9 @@ impl Timeline {
}
match self.get_compaction_algorithm_settings().kind {
CompactionAlgorithm::NotSpecified => {
unreachable!("should panic earlier when we construct the default tenant conf")
}
CompactionAlgorithm::Tiered => self.compact_tiered(cancel, ctx).await,
CompactionAlgorithm::Legacy => self.compact_legacy(cancel, flags, ctx).await,
}

View File

@@ -125,13 +125,6 @@ typedef struct
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *wes_read;
/*---
* WaitEventSet containing:
* - WL_SOCKET_WRITABLE on 'conn'
* - WL_LATCH_SET on MyLatch, and
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *wes_write;
} PageServer;
static PageServer page_servers[MAX_SHARDS];
@@ -336,11 +329,6 @@ CLEANUP_AND_DISCONNECT(PageServer *shard)
FreeWaitEventSet(shard->wes_read);
shard->wes_read = NULL;
}
if (shard->wes_write)
{
FreeWaitEventSet(shard->wes_write);
shard->wes_write = NULL;
}
if (shard->conn)
{
PQfinish(shard->conn);
@@ -436,22 +424,6 @@ pageserver_connect(shardno_t shard_no, int elevel)
return false;
}
shard->wes_read = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(shard->wes_read, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(shard->wes_read, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
AddWaitEventToSet(shard->wes_read, WL_SOCKET_READABLE, PQsocket(shard->conn), NULL, NULL);
shard->wes_write = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(shard->wes_write, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(shard->wes_write, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
AddWaitEventToSet(shard->wes_write, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE,
PQsocket(shard->conn),
NULL, NULL);
shard->state = PS_Connecting_Startup;
/* fallthrough */
}
@@ -460,13 +432,12 @@ pageserver_connect(shardno_t shard_no, int elevel)
char *pagestream_query;
int ps_send_query_ret;
bool connected = false;
int poll_result = PGRES_POLLING_WRITING;
neon_shard_log(shard_no, DEBUG5, "Connection state: Connecting_Startup");
do
{
WaitEvent event;
int poll_result = PQconnectPoll(shard->conn);
switch (poll_result)
{
@@ -497,25 +468,45 @@ pageserver_connect(shardno_t shard_no, int elevel)
}
case PGRES_POLLING_READING:
/* Sleep until there's something to do */
(void) WaitEventSetWait(shard->wes_read, -1L, &event, 1,
PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
/* query cancellation, backend shutdown */
CHECK_FOR_INTERRUPTS();
while (true)
{
int rc = WaitLatchOrSocket(MyLatch,
WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | WL_SOCKET_READABLE,
PQsocket(shard->conn),
0,
PG_WAIT_EXTENSION);
elog(DEBUG5, "PGRES_POLLING_READING=>%d", rc);
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
/* query cancellation, backend shutdown */
CHECK_FOR_INTERRUPTS();
}
if (rc & WL_SOCKET_READABLE)
break;
}
/* PQconnectPoll() handles the socket polling state updates */
break;
case PGRES_POLLING_WRITING:
/* Sleep until there's something to do */
(void) WaitEventSetWait(shard->wes_write, -1L, &event, 1,
PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
/* query cancellation, backend shutdown */
CHECK_FOR_INTERRUPTS();
while (true)
{
int rc = WaitLatchOrSocket(MyLatch,
WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | WL_SOCKET_WRITEABLE,
PQsocket(shard->conn),
0,
PG_WAIT_EXTENSION);
elog(DEBUG5, "PGRES_POLLING_WRITING=>%d", rc);
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
/* query cancellation, backend shutdown */
CHECK_FOR_INTERRUPTS();
}
if (rc & WL_SOCKET_WRITEABLE)
break;
}
/* PQconnectPoll() handles the socket polling state updates */
break;
@@ -524,12 +515,22 @@ pageserver_connect(shardno_t shard_no, int elevel)
connected = true;
break;
}
poll_result = PQconnectPoll(shard->conn);
elog(DEBUG5, "PQconnectPoll=>%d", poll_result);
}
while (!connected);
/* No more polling needed; connection succeeded */
shard->last_connect_time = GetCurrentTimestamp();
shard->wes_read = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(shard->wes_read, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(shard->wes_read, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
AddWaitEventToSet(shard->wes_read, WL_SOCKET_READABLE, PQsocket(shard->conn), NULL, NULL);
switch (neon_protocol_version)
{
case 2:

View File

@@ -584,9 +584,9 @@ prefetch_read(PrefetchRequest *slot)
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
neon_shard_log(slot->shard_no, ERROR,
"Incorrect prefetch read: status=%d response=%llx my=%llu receive=%llu",
slot->status, (size_t) (void *) slot->response,
slot->my_ring_index, MyPState->ring_receive);
"Incorrect prefetch read: status=%d response=%p my=%lu receive=%lu",
slot->status, slot->response,
(long)slot->my_ring_index, (long)MyPState->ring_receive);
old = MemoryContextSwitchTo(MyPState->errctx);
response = (NeonResponse *) page_server->receive(slot->shard_no);
@@ -606,8 +606,8 @@ prefetch_read(PrefetchRequest *slot)
else
{
neon_shard_log(slot->shard_no, WARNING,
"No response from reading prefetch entry %llu: %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect",
slot->my_ring_index,
"No response from reading prefetch entry %lu: %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect",
(long)slot->my_ring_index,
RelFileInfoFmt(BufTagGetNRelFileInfo(slot->buftag)),
slot->buftag.forkNum, slot->buftag.blockNum);
return false;

View File

@@ -531,6 +531,64 @@ def test_recovery_uncommitted(neon_env_builder: NeonEnvBuilder):
asyncio.run(run_recovery_uncommitted(env))
async def run_wal_truncation(env: NeonEnv):
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
(sk1, sk2, sk3) = env.safekeepers
ep = env.endpoints.create_start("main")
ep.safe_psql("create table t (key int, value text)")
ep.safe_psql("insert into t select generate_series(1, 100), 'payload'")
# insert with only one sk3 up to create tail of flushed but not committed WAL on it
sk1.stop()
sk2.stop()
conn = await ep.connect_async()
# query should hang, so execute in separate task
bg_query = asyncio.create_task(
conn.execute("insert into t select generate_series(1, 180000), 'Papaya'")
)
sleep_sec = 2
await asyncio.sleep(sleep_sec)
# it must still be not finished
assert not bg_query.done()
# note: destoy will kill compute_ctl, preventing it waiting for hanging sync-safekeepers.
ep.stop_and_destroy()
# stop sk3 as well
sk3.stop()
# now start sk1 and sk2 and make them commit something
sk1.start()
sk2.start()
ep = env.endpoints.create_start(
"main",
)
ep.safe_psql("insert into t select generate_series(1, 200), 'payload'")
# start sk3 and wait for it to catch up
sk3.start()
flush_lsn = Lsn(ep.safe_psql_scalar("SELECT pg_current_wal_flush_lsn()"))
await wait_for_lsn(sk3, tenant_id, timeline_id, flush_lsn)
timeline_start_lsn = sk1.get_timeline_start_lsn(tenant_id, timeline_id)
digests = [
sk.http_client().timeline_digest(tenant_id, timeline_id, timeline_start_lsn, flush_lsn)
for sk in [sk1, sk2]
]
assert digests[0] == digests[1], f"digest on sk1 is {digests[0]} but on sk3 is {digests[1]}"
# Simple deterministic test creating tail of WAL on safekeeper which is
# truncated when majority without this sk elects walproposer starting earlier.
def test_wal_truncation(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
asyncio.run(run_wal_truncation(env))
async def run_segment_init_failure(env: NeonEnv):
env.neon_cli.create_branch("test_segment_init_failure")
ep = env.endpoints.create_start("test_segment_init_failure")