mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-24 08:30:37 +00:00
Compare commits
4 Commits
jcsp/foo-b
...
jcsp/layer
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d530aab105 | ||
|
|
c2c9530ab7 | ||
|
|
c62f45fff4 | ||
|
|
6a9d51b41f |
@@ -1,8 +1,6 @@
|
||||
[](https://neon.tech)
|
||||
|
||||
|
||||
foo
|
||||
|
||||
|
||||
# Neon
|
||||
|
||||
|
||||
@@ -997,6 +997,7 @@ impl PageServerHandler {
|
||||
)
|
||||
.await?;
|
||||
|
||||
tracing::info!("get_rel_page_at_lsn: {lsn}");
|
||||
let page = timeline
|
||||
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), ctx)
|
||||
.await?;
|
||||
|
||||
@@ -1470,4 +1470,52 @@ mod tests {
|
||||
LayerVisibilityHint::Visible
|
||||
));
|
||||
}
|
||||
|
||||
/// Exercise edge case of querying at exactly the LSN of an image layer
|
||||
#[test]
|
||||
fn layer_search_at_image_lsn() {
|
||||
let tenant_id = TenantId::generate();
|
||||
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
|
||||
let timeline_id = TimelineId::generate();
|
||||
|
||||
let last_record_lsn = Lsn::from_hex("00000000DEADBEEF").unwrap();
|
||||
|
||||
let mut layer_map = LayerMap::default();
|
||||
let mut updates = layer_map.batch_update();
|
||||
|
||||
let image_layer = PersistentLayerDesc {
|
||||
key_range: Key::from_i128(0)..Key::from_i128(i128::MAX),
|
||||
lsn_range: PersistentLayerDesc::image_layer_lsn_range(last_record_lsn),
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
is_delta: false,
|
||||
file_size: 123,
|
||||
};
|
||||
|
||||
let delta_layer = PersistentLayerDesc {
|
||||
key_range: Key::from_i128(0)..Key::from_i128(i128::MAX),
|
||||
lsn_range: Lsn(0)..Lsn(0xdead0000),
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
is_delta: true,
|
||||
file_size: 123,
|
||||
};
|
||||
|
||||
updates.insert_historic(image_layer.clone());
|
||||
updates.insert_historic(delta_layer);
|
||||
|
||||
updates.flush();
|
||||
|
||||
// FIXME: according to the search() docstring, it searches for layers with start LSNs _less then_
|
||||
// `end_lsn` -- i.e. it's correct that if you ask for exactly the LSN of an image layer, it shouldn't hit
|
||||
// it. However, the way that page_service calls it is to take the last_record_lsn of a Timeline
|
||||
// and pass that directly into LayerMap::search().
|
||||
|
||||
let searched = layer_map
|
||||
.search(Key::from_i128(12345), last_record_lsn)
|
||||
.unwrap();
|
||||
|
||||
// We searched at the LSN of the image layer: we should hit it
|
||||
assert_eq!(searched.layer.as_ref(), &image_layer);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -433,6 +433,7 @@ impl ReadableLayer {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
tracing::info!("get_values_reconstruct_data: {:?}", self.id());
|
||||
match self {
|
||||
ReadableLayer::PersistentLayer(layer) => {
|
||||
layer
|
||||
|
||||
@@ -3856,21 +3856,21 @@ impl Timeline {
|
||||
)));
|
||||
}
|
||||
|
||||
let distance = lsn.0 - partition_lsn.0;
|
||||
if *partition_lsn != Lsn(0)
|
||||
&& distance <= self.repartition_threshold
|
||||
&& !flags.contains(CompactFlags::ForceRepartition)
|
||||
{
|
||||
debug!(
|
||||
distance,
|
||||
threshold = self.repartition_threshold,
|
||||
"no repartitioning needed"
|
||||
);
|
||||
return Ok((
|
||||
(dense_partition.clone(), sparse_partition.clone()),
|
||||
*partition_lsn,
|
||||
));
|
||||
}
|
||||
// let distance = lsn.0 - partition_lsn.0;
|
||||
// if *partition_lsn != Lsn(0)
|
||||
// && distance <= self.repartition_threshold
|
||||
// && !flags.contains(CompactFlags::ForceRepartition)
|
||||
// {
|
||||
// debug!(
|
||||
// distance,
|
||||
// threshold = self.repartition_threshold,
|
||||
// "no repartitioning needed"
|
||||
// );
|
||||
// return Ok((
|
||||
// (dense_partition.clone(), sparse_partition.clone()),
|
||||
// *partition_lsn,
|
||||
// ));
|
||||
// }
|
||||
|
||||
let (dense_ks, sparse_ks) = self.collect_keyspace(lsn, ctx).await?;
|
||||
let dense_partitioning = dense_ks.partition(&self.shard_identity, partition_size);
|
||||
@@ -5779,6 +5779,7 @@ impl<'a> TimelineWriter<'a> {
|
||||
/// the 'lsn' or anything older. The previous last record LSN is stored alongside
|
||||
/// the latest and can be read.
|
||||
pub(crate) fn finish_write(&self, new_lsn: Lsn) {
|
||||
tracing::info!("finish_write @ {new_lsn}");
|
||||
self.tl.finish_write(new_lsn);
|
||||
}
|
||||
|
||||
|
||||
@@ -364,6 +364,10 @@ impl Timeline {
|
||||
// 3. Create new image layers for partitions that have been modified
|
||||
// "enough". Skip image layer creation if L0 compaction cannot keep up.
|
||||
if fully_compacted {
|
||||
tracing::info!(
|
||||
"create_image_layers @ {lsn} (latest {})",
|
||||
self.get_last_record_lsn()
|
||||
);
|
||||
let image_layers = self
|
||||
.create_image_layers(
|
||||
&partitioning,
|
||||
|
||||
@@ -56,20 +56,32 @@ class Workload:
|
||||
with ENDPOINT_LOCK:
|
||||
self._endpoint.reconfigure()
|
||||
|
||||
def endpoint(self, pageserver_id: Optional[int] = None) -> Endpoint:
|
||||
def go_readonly(self):
|
||||
self.stop()
|
||||
self._endpoint = self.make_endpoint(readonly=True, pageserver_id=None)
|
||||
self._endpoint.start(pageserver_id=None)
|
||||
|
||||
def make_endpoint(self, readonly: bool, pageserver_id: Optional[int] = None) -> Endpoint:
|
||||
# We may be running alongside other Workloads for different tenants. Full TTID is
|
||||
# obnoxiously long for use here, but a cut-down version is still unique enough for tests.
|
||||
endpoint_id = f"ep-workload-{str(self.tenant_id)[0:4]}-{str(self.timeline_id)[0:4]}"
|
||||
|
||||
if readonly:
|
||||
self._endpoint_opts["hot_standby"] = True
|
||||
|
||||
return self.env.endpoints.create(
|
||||
self.branch_name,
|
||||
tenant_id=self.tenant_id,
|
||||
pageserver_id=pageserver_id,
|
||||
endpoint_id=endpoint_id,
|
||||
**self._endpoint_opts,
|
||||
)
|
||||
|
||||
def endpoint(self, pageserver_id: Optional[int] = None) -> Endpoint:
|
||||
with ENDPOINT_LOCK:
|
||||
if self._endpoint is None:
|
||||
self._endpoint = self.env.endpoints.create(
|
||||
self.branch_name,
|
||||
tenant_id=self.tenant_id,
|
||||
pageserver_id=pageserver_id,
|
||||
endpoint_id=endpoint_id,
|
||||
**self._endpoint_opts,
|
||||
)
|
||||
self._endpoint = self.make_endpoint(pageserver_id=pageserver_id, readonly=False)
|
||||
|
||||
self._endpoint.start(pageserver_id=pageserver_id)
|
||||
else:
|
||||
self._endpoint.reconfigure(pageserver_id=pageserver_id)
|
||||
|
||||
@@ -11,6 +11,7 @@ from fixtures.neon_fixtures import (
|
||||
generate_uploads_and_deletions,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn
|
||||
from fixtures.utils import wait_until
|
||||
from fixtures.workload import Workload
|
||||
|
||||
@@ -412,3 +413,32 @@ def test_image_layer_compression(neon_env_builder: NeonEnvBuilder, enabled: bool
|
||||
f"SELECT count(*) FROM foo WHERE id={v} and val=repeat('abcde{v:0>3}', 500)"
|
||||
)
|
||||
assert res[0][0] == 1
|
||||
|
||||
|
||||
def test_image_layer_reads(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init()
|
||||
workload.write_rows(256)
|
||||
workload.validate()
|
||||
|
||||
# wait_for_wal_insert_lsn(env, workload._endpoint, tenant_id, timeline_id)
|
||||
|
||||
workload.go_readonly()
|
||||
|
||||
commit_lsn = env.safekeepers[0].http_client().get_commit_lsn(tenant_id, timeline_id)
|
||||
wait_for_last_record_lsn(env.pageserver.http_client(), tenant_id, timeline_id, commit_lsn)
|
||||
log.info(f"Ingested up to commit_lsn {commit_lsn}")
|
||||
|
||||
env.pageserver.http_client().timeline_compact(
|
||||
tenant_id, timeline_id, force_image_layer_creation=True
|
||||
)
|
||||
|
||||
# This should send getpage requests at the same LSN where we just created image layers
|
||||
workload.validate()
|
||||
|
||||
# Nothing should have written in the meantime
|
||||
assert commit_lsn == env.safekeepers[0].http_client().get_commit_lsn(tenant_id, timeline_id)
|
||||
|
||||
Reference in New Issue
Block a user