mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-19 06:00:38 +00:00
Compare commits
4 Commits
jcsp/bytes
...
test_s3_wa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4faf744c36 | ||
|
|
85a515c176 | ||
|
|
aa88279681 | ||
|
|
b2a670c765 |
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -7116,9 +7116,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "1.43.0"
|
||||
version = "1.43.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e"
|
||||
checksum = "492a604e2fd7f814268a378409e6c92b5525d747d10db9a229723f55a417958c"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"bytes",
|
||||
|
||||
@@ -183,7 +183,7 @@ test-context = "0.3"
|
||||
thiserror = "1.0"
|
||||
tikv-jemallocator = { version = "0.6", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] }
|
||||
tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] }
|
||||
tokio = { version = "1.41", features = ["macros"] }
|
||||
tokio = { version = "1.43.1", features = ["macros"] }
|
||||
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
|
||||
tokio-io-timeout = "1.2.0"
|
||||
tokio-postgres-rustls = "0.12.0"
|
||||
|
||||
@@ -27,9 +27,6 @@ pub(super) enum Name {
|
||||
/// Timeline logical size
|
||||
#[serde(rename = "timeline_logical_size")]
|
||||
LogicalSize,
|
||||
/// Timeline delta from parent
|
||||
#[serde(rename = "timeline_changed_bytes_from_parent")]
|
||||
ChangesFromParent,
|
||||
/// Tenant remote size
|
||||
#[serde(rename = "remote_storage_size")]
|
||||
RemoteSize,
|
||||
@@ -178,18 +175,6 @@ impl MetricsKey {
|
||||
.absolute_values()
|
||||
}
|
||||
|
||||
const fn timeline_changed_bytes_from_parent(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> AbsoluteValueFactory {
|
||||
MetricsKey {
|
||||
tenant_id,
|
||||
timeline_id: Some(timeline_id),
|
||||
metric: Name::ChangesFromParent,
|
||||
}
|
||||
.absolute_values()
|
||||
}
|
||||
|
||||
/// [`Tenant::remote_size`]
|
||||
///
|
||||
/// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size
|
||||
@@ -379,7 +364,6 @@ struct TimelineSnapshot {
|
||||
loaded_at: (Lsn, SystemTime),
|
||||
last_record_lsn: Lsn,
|
||||
current_exact_logical_size: Option<u64>,
|
||||
changed_bytes_from_parent: Option<u64>,
|
||||
}
|
||||
|
||||
impl TimelineSnapshot {
|
||||
@@ -415,23 +399,10 @@ impl TimelineSnapshot {
|
||||
}
|
||||
};
|
||||
|
||||
// This is an approximation of how much data has changed on this branch vs. its ancestor: the
|
||||
// number of bytes written to the WAL, clamped to the size of the branch.
|
||||
let changed_bytes_from_parent = current_exact_logical_size.and_then(|size| {
|
||||
if t.get_ancestor_lsn() == Lsn::MAX {
|
||||
None
|
||||
} else {
|
||||
t.get_last_record_lsn()
|
||||
.checked_sub(t.get_ancestor_lsn())
|
||||
.map(|wal_bytes| std::cmp::min(wal_bytes.0, size))
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Some(TimelineSnapshot {
|
||||
loaded_at,
|
||||
last_record_lsn,
|
||||
current_exact_logical_size,
|
||||
changed_bytes_from_parent,
|
||||
}))
|
||||
}
|
||||
}
|
||||
@@ -509,17 +480,6 @@ impl TimelineSnapshot {
|
||||
metrics.push(factory.at(now, size));
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
let factory = MetricsKey::timeline_changed_bytes_from_parent(tenant_id, timeline_id);
|
||||
let current_or_previous = self
|
||||
.changed_bytes_from_parent
|
||||
.or_else(|| cache.get(factory.key()).map(|item| item.value));
|
||||
|
||||
if let Some(size) = current_or_previous {
|
||||
metrics.push(factory.at(now, size));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,6 @@ fn startup_collected_timeline_metrics_before_advancing() {
|
||||
loaded_at: (disk_consistent_lsn, SystemTime::now()),
|
||||
last_record_lsn: disk_consistent_lsn,
|
||||
current_exact_logical_size: Some(0x42000),
|
||||
changed_bytes_from_parent: Some(0x1000),
|
||||
};
|
||||
|
||||
let now = DateTime::<Utc>::from(SystemTime::now());
|
||||
@@ -34,8 +33,7 @@ fn startup_collected_timeline_metrics_before_advancing() {
|
||||
0
|
||||
),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000),
|
||||
MetricsKey::timeline_changed_bytes_from_parent(tenant_id, timeline_id).at(now, 0x1000)
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
|
||||
]
|
||||
);
|
||||
}
|
||||
@@ -62,7 +60,6 @@ fn startup_collected_timeline_metrics_second_round() {
|
||||
loaded_at: (disk_consistent_lsn, init),
|
||||
last_record_lsn: disk_consistent_lsn,
|
||||
current_exact_logical_size: Some(0x42000),
|
||||
changed_bytes_from_parent: Some(0x1000),
|
||||
};
|
||||
|
||||
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
|
||||
@@ -72,8 +69,7 @@ fn startup_collected_timeline_metrics_second_round() {
|
||||
&[
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, now, 0),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000),
|
||||
MetricsKey::timeline_changed_bytes_from_parent(tenant_id, timeline_id).at(now, 0x1000)
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
|
||||
]
|
||||
);
|
||||
}
|
||||
@@ -108,7 +104,6 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
|
||||
loaded_at: (disk_consistent_lsn, init),
|
||||
last_record_lsn: disk_consistent_lsn,
|
||||
current_exact_logical_size: Some(0x42000),
|
||||
changed_bytes_from_parent: Some(0x1000),
|
||||
};
|
||||
|
||||
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
|
||||
@@ -118,8 +113,7 @@ fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
|
||||
&[
|
||||
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(just_before, now, 0),
|
||||
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000),
|
||||
MetricsKey::timeline_changed_bytes_from_parent(tenant_id, timeline_id).at(now, 0x1000)
|
||||
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
|
||||
]
|
||||
);
|
||||
}
|
||||
@@ -147,7 +141,6 @@ fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
|
||||
loaded_at: (Lsn(50), at_restart),
|
||||
last_record_lsn: Lsn(50),
|
||||
current_exact_logical_size: None,
|
||||
changed_bytes_from_parent: None,
|
||||
};
|
||||
|
||||
let mut cache = HashMap::from([
|
||||
@@ -209,7 +202,6 @@ fn post_restart_current_exact_logical_size_uses_cached() {
|
||||
loaded_at: (Lsn(50), at_restart),
|
||||
last_record_lsn: Lsn(50),
|
||||
current_exact_logical_size: None,
|
||||
changed_bytes_from_parent: Some(0x1000),
|
||||
};
|
||||
|
||||
let cache = HashMap::from([MetricsKey::timeline_logical_size(tenant_id, timeline_id)
|
||||
|
||||
@@ -274,13 +274,8 @@ typedef struct
|
||||
XLogRecPtr effective_request_lsn;
|
||||
} neon_request_lsns;
|
||||
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
neon_request_lsns request_lsns, char *buffer);
|
||||
#else
|
||||
extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
neon_request_lsns request_lsns, void *buffer);
|
||||
#endif
|
||||
extern int64 neon_dbsize(Oid dbNode);
|
||||
|
||||
/* utils for neon relsize cache */
|
||||
|
||||
@@ -3154,13 +3154,8 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
|
||||
* The offsets in request_lsns, buffers, and mask are linked.
|
||||
*/
|
||||
static void
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_blockno, neon_request_lsns *request_lsns,
|
||||
char **buffers, BlockNumber nblocks, const bits8 *mask)
|
||||
#else
|
||||
neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_blockno, neon_request_lsns *request_lsns,
|
||||
void **buffers, BlockNumber nblocks, const bits8 *mask)
|
||||
#endif
|
||||
{
|
||||
NeonResponse *resp;
|
||||
uint64 ring_index;
|
||||
@@ -3356,13 +3351,8 @@ Retry:
|
||||
* To avoid breaking tests in the runtime please keep function signature in sync.
|
||||
*/
|
||||
void
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
neon_request_lsns request_lsns, char *buffer)
|
||||
#else
|
||||
neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
neon_request_lsns request_lsns, void *buffer)
|
||||
#endif
|
||||
{
|
||||
neon_read_at_lsnv(rinfo, forkNum, blkno, &request_lsns, &buffer, 1, NULL);
|
||||
}
|
||||
|
||||
@@ -50,13 +50,8 @@ PG_FUNCTION_INFO_V1(trigger_segfault);
|
||||
* Linkage to functions in neon module.
|
||||
* The signature here would need to be updated whenever function parameters change in pagestore_smgr.c
|
||||
*/
|
||||
#if PG_MAJORVERSION_NUM < 16
|
||||
typedef void (*neon_read_at_lsn_type) (NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
neon_request_lsns request_lsns, char *buffer);
|
||||
#else
|
||||
typedef void (*neon_read_at_lsn_type) (NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
neon_request_lsns request_lsns, void *buffer);
|
||||
#endif
|
||||
|
||||
static neon_read_at_lsn_type neon_read_at_lsn_ptr;
|
||||
|
||||
|
||||
@@ -899,7 +899,7 @@ async fn handle_node_status(req: Request<Body>) -> Result<Response<Body>, ApiErr
|
||||
let state = get_state(&req);
|
||||
let node_id: NodeId = parse_request_param(&req, "node_id")?;
|
||||
|
||||
let node_status = state.service.get_node(node_id).await?;
|
||||
let node_status = state.service.get_node(node_id).await?.describe();
|
||||
|
||||
json_response(StatusCode::OK, node_status)
|
||||
}
|
||||
|
||||
@@ -511,5 +511,4 @@ PER_METRIC_VERIFIERS = {
|
||||
"written_data_bytes_delta": WrittenDataDeltaVerifier,
|
||||
"timeline_logical_size": CannotVerifyAnything,
|
||||
"synthetic_storage_size": SyntheticSizeVerifier,
|
||||
"timeline_changed_bytes_from_parent": CannotVerifyAnything,
|
||||
}
|
||||
|
||||
@@ -488,14 +488,13 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder):
|
||||
assert_prefix_empty(neon_env_builder.safekeepers_remote_storage, prefix)
|
||||
|
||||
|
||||
# This test is flaky, probably because PUTs of local fs storage are not atomic.
|
||||
# Let's keep both remote storage kinds for a while to see if this is the case.
|
||||
# https://github.com/neondatabase/neon/issues/10761
|
||||
@pytest.mark.parametrize("remote_storage_kind", [s3_storage(), RemoteStorageKind.LOCAL_FS])
|
||||
def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind):
|
||||
def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
|
||||
neon_env_builder.enable_safekeeper_remote_storage(remote_storage_kind)
|
||||
# Note: local fs implementation is not protected from concurrent uploads of
|
||||
# the same segment by different safekeepers and makes the test flaky.
|
||||
# https://github.com/neondatabase/neon/issues/10761
|
||||
neon_env_builder.enable_safekeeper_remote_storage(s3_storage())
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id = env.initial_tenant
|
||||
|
||||
Reference in New Issue
Block a user