Compare commits

...

4 Commits

Author SHA1 Message Date
John Spray
b58e9ef05b tests: add test_image_layer_reads 2024-09-27 18:09:57 +01:00
John Spray
c2c9530ab7 hack: log layer accesses 2024-09-27 16:49:59 +01:00
John Spray
c62f45fff4 hack: always repartition 2024-09-27 16:49:42 +01:00
John Spray
6a9d51b41f pageserver: unit test for case of LayerMap::search at same LSN as image layer 2024-09-27 13:39:03 +01:00
7 changed files with 130 additions and 23 deletions

View File

@@ -997,6 +997,7 @@ impl PageServerHandler {
)
.await?;
tracing::info!("get_rel_page_at_lsn: {lsn}");
let page = timeline
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), ctx)
.await?;

View File

@@ -1470,4 +1470,52 @@ mod tests {
LayerVisibilityHint::Visible
));
}
/// Exercise edge case of querying at exactly the LSN of an image layer
#[test]
fn layer_search_at_image_lsn() {
let tenant_id = TenantId::generate();
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let timeline_id = TimelineId::generate();
let last_record_lsn = Lsn::from_hex("00000000DEADBEEF").unwrap();
let mut layer_map = LayerMap::default();
let mut updates = layer_map.batch_update();
let image_layer = PersistentLayerDesc {
key_range: Key::from_i128(0)..Key::from_i128(i128::MAX),
lsn_range: PersistentLayerDesc::image_layer_lsn_range(last_record_lsn),
tenant_shard_id,
timeline_id,
is_delta: false,
file_size: 123,
};
let delta_layer = PersistentLayerDesc {
key_range: Key::from_i128(0)..Key::from_i128(i128::MAX),
lsn_range: Lsn(0)..Lsn(0xdead0000),
tenant_shard_id,
timeline_id,
is_delta: true,
file_size: 123,
};
updates.insert_historic(image_layer.clone());
updates.insert_historic(delta_layer);
updates.flush();
// FIXME: according to the search() docstring, it searches for layers with start LSNs _less then_
// `end_lsn` -- i.e. it's correct that if you ask for exactly the LSN of an image layer, it shouldn't hit
// it. However, the way that page_service calls it is to take the last_record_lsn of a Timeline
// and pass that directly into LayerMap::search().
let searched = layer_map
.search(Key::from_i128(12345), last_record_lsn)
.unwrap();
// We searched at the LSN of the image layer: we should hit it
assert_eq!(searched.layer.as_ref(), &image_layer);
}
}

View File

@@ -433,6 +433,7 @@ impl ReadableLayer {
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
tracing::info!("get_values_reconstruct_data: {:?}", self.id());
match self {
ReadableLayer::PersistentLayer(layer) => {
layer

View File

@@ -3856,21 +3856,21 @@ impl Timeline {
)));
}
let distance = lsn.0 - partition_lsn.0;
if *partition_lsn != Lsn(0)
&& distance <= self.repartition_threshold
&& !flags.contains(CompactFlags::ForceRepartition)
{
debug!(
distance,
threshold = self.repartition_threshold,
"no repartitioning needed"
);
return Ok((
(dense_partition.clone(), sparse_partition.clone()),
*partition_lsn,
));
}
// let distance = lsn.0 - partition_lsn.0;
// if *partition_lsn != Lsn(0)
// && distance <= self.repartition_threshold
// && !flags.contains(CompactFlags::ForceRepartition)
// {
// debug!(
// distance,
// threshold = self.repartition_threshold,
// "no repartitioning needed"
// );
// return Ok((
// (dense_partition.clone(), sparse_partition.clone()),
// *partition_lsn,
// ));
// }
let (dense_ks, sparse_ks) = self.collect_keyspace(lsn, ctx).await?;
let dense_partitioning = dense_ks.partition(&self.shard_identity, partition_size);
@@ -5779,6 +5779,7 @@ impl<'a> TimelineWriter<'a> {
/// the 'lsn' or anything older. The previous last record LSN is stored alongside
/// the latest and can be read.
pub(crate) fn finish_write(&self, new_lsn: Lsn) {
tracing::info!("finish_write @ {new_lsn}");
self.tl.finish_write(new_lsn);
}

View File

@@ -364,6 +364,10 @@ impl Timeline {
// 3. Create new image layers for partitions that have been modified
// "enough". Skip image layer creation if L0 compaction cannot keep up.
if fully_compacted {
tracing::info!(
"create_image_layers @ {lsn} (latest {})",
self.get_last_record_lsn()
);
let image_layers = self
.create_image_layers(
&partitioning,

View File

@@ -56,20 +56,32 @@ class Workload:
with ENDPOINT_LOCK:
self._endpoint.reconfigure()
def endpoint(self, pageserver_id: Optional[int] = None) -> Endpoint:
def go_readonly(self):
self.stop()
self._endpoint = self.make_endpoint(readonly=True, pageserver_id=None)
self._endpoint.start(pageserver_id=None)
def make_endpoint(self, readonly: bool, pageserver_id: Optional[int] = None) -> Endpoint:
# We may be running alongside other Workloads for different tenants. Full TTID is
# obnoxiously long for use here, but a cut-down version is still unique enough for tests.
endpoint_id = f"ep-workload-{str(self.tenant_id)[0:4]}-{str(self.timeline_id)[0:4]}"
if readonly:
self._endpoint_opts["hot_standby"] = True
return self.env.endpoints.create(
self.branch_name,
tenant_id=self.tenant_id,
pageserver_id=pageserver_id,
endpoint_id=endpoint_id,
**self._endpoint_opts,
)
def endpoint(self, pageserver_id: Optional[int] = None) -> Endpoint:
with ENDPOINT_LOCK:
if self._endpoint is None:
self._endpoint = self.env.endpoints.create(
self.branch_name,
tenant_id=self.tenant_id,
pageserver_id=pageserver_id,
endpoint_id=endpoint_id,
**self._endpoint_opts,
)
self._endpoint = self.make_endpoint(pageserver_id=pageserver_id, readonly=False)
self._endpoint.start(pageserver_id=pageserver_id)
else:
self._endpoint.reconfigure(pageserver_id=pageserver_id)

View File

@@ -11,6 +11,7 @@ from fixtures.neon_fixtures import (
generate_uploads_and_deletions,
)
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import wait_for_last_record_lsn
from fixtures.utils import wait_until
from fixtures.workload import Workload
@@ -412,3 +413,42 @@ def test_image_layer_compression(neon_env_builder: NeonEnvBuilder, enabled: bool
f"SELECT count(*) FROM foo WHERE id={v} and val=repeat('abcde{v:0>3}', 500)"
)
assert res[0][0] == 1
def test_image_layer_reads(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
env.pageserver.http_client().set_tenant_config(
tenant_id,
{
"compaction_period": "0s",
},
)
workload = Workload(env, tenant_id, timeline_id)
workload.init()
workload.write_rows(256)
workload.validate()
workload.go_readonly()
commit_lsn = env.safekeepers[0].http_client().get_commit_lsn(tenant_id, timeline_id)
wait_for_last_record_lsn(env.pageserver.http_client(), tenant_id, timeline_id, commit_lsn)
log.info(f"Ingested up to commit_lsn {commit_lsn}")
env.pageserver.http_client().timeline_compact(
tenant_id, timeline_id, force_image_layer_creation=True
)
# Uncomment this checkpoint, and the logs will show getpage requests hitting the image layers we
# just created. However, without the checkpoint, getpage requests will hit one InMemoryLayer and
# one persistent delta layer.
# env.pageserver.http_client().timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True)
# This should send getpage requests at the same LSN where we just created image layers
workload.validate()
# Nothing should have written in the meantime
assert commit_lsn == env.safekeepers[0].http_client().get_commit_lsn(tenant_id, timeline_id)