mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 21:20:37 +00:00
Compare commits
12 Commits
release-67
...
jcsp/layer
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b58e9ef05b | ||
|
|
c2c9530ab7 | ||
|
|
c62f45fff4 | ||
|
|
6a9d51b41f | ||
|
|
7bae78186b | ||
|
|
7e560dd00e | ||
|
|
684e924211 | ||
|
|
8ace9ea25f | ||
|
|
6a4f49b08b | ||
|
|
c6e89445e2 | ||
|
|
04f32b9526 | ||
|
|
6f2333f52b |
10
.github/workflows/_build-and-test-locally.yml
vendored
10
.github/workflows/_build-and-test-locally.yml
vendored
@@ -257,7 +257,15 @@ jobs:
|
||||
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(remote_storage)' -E 'test(test_real_azure)'
|
||||
|
||||
- name: Install postgres binaries
|
||||
run: cp -a pg_install /tmp/neon/pg_install
|
||||
run: |
|
||||
# Use tar to copy files matching the pattern, preserving the paths in the destionation
|
||||
tar c \
|
||||
pg_install/v* \
|
||||
pg_install/build/*/src/test/regress/*.so \
|
||||
pg_install/build/*/src/test/regress/pg_regress \
|
||||
pg_install/build/*/src/test/isolation/isolationtester \
|
||||
pg_install/build/*/src/test/isolation/pg_isolation_regress \
|
||||
| tar x -C /tmp/neon
|
||||
|
||||
- name: Upload Neon artifact
|
||||
uses: ./.github/actions/upload
|
||||
|
||||
@@ -195,7 +195,7 @@ metrics:
|
||||
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp. These
|
||||
-- temporary snapshot files are renamed to the actual snapshot files after they are
|
||||
-- completely built. We only WAL-log the completely built snapshot files.
|
||||
(SELECT COUNT(*) FROM pg_ls_logicalsnapdir() WHERE name LIKE '%.snap') AS num_logical_snapshot_files;
|
||||
(SELECT COUNT(*) FROM pg_ls_dir('pg_logical/snapshots') AS name WHERE name LIKE '%.snap') AS num_logical_snapshot_files;
|
||||
|
||||
# In all the below metrics, we cast LSNs to floats because Prometheus only supports floats.
|
||||
# It's probably fine because float64 can store integers from -2^53 to +2^53 exactly.
|
||||
@@ -244,4 +244,3 @@ metrics:
|
||||
SELECT slot_name,
|
||||
CASE WHEN wal_status = 'lost' THEN 1 ELSE 0 END AS wal_is_lost
|
||||
FROM pg_replication_slots;
|
||||
|
||||
|
||||
@@ -589,6 +589,10 @@ async fn timeline_create_handler(
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
HttpErrorBody::from_msg(e.to_string()),
|
||||
),
|
||||
Err(e @ tenant::CreateTimelineError::AncestorArchived) => json_response(
|
||||
StatusCode::NOT_ACCEPTABLE,
|
||||
HttpErrorBody::from_msg(e.to_string()),
|
||||
),
|
||||
Err(tenant::CreateTimelineError::ShuttingDown) => json_response(
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
HttpErrorBody::from_msg("tenant shutting down".to_string()),
|
||||
|
||||
@@ -997,6 +997,7 @@ impl PageServerHandler {
|
||||
)
|
||||
.await?;
|
||||
|
||||
tracing::info!("get_rel_page_at_lsn: {lsn}");
|
||||
let page = timeline
|
||||
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), ctx)
|
||||
.await?;
|
||||
|
||||
@@ -563,6 +563,8 @@ pub enum CreateTimelineError {
|
||||
AncestorLsn(anyhow::Error),
|
||||
#[error("ancestor timeline is not active")]
|
||||
AncestorNotActive,
|
||||
#[error("ancestor timeline is archived")]
|
||||
AncestorArchived,
|
||||
#[error("tenant shutting down")]
|
||||
ShuttingDown,
|
||||
#[error(transparent)]
|
||||
@@ -1698,6 +1700,11 @@ impl Tenant {
|
||||
return Err(CreateTimelineError::AncestorNotActive);
|
||||
}
|
||||
|
||||
if ancestor_timeline.is_archived() == Some(true) {
|
||||
info!("tried to branch archived timeline");
|
||||
return Err(CreateTimelineError::AncestorArchived);
|
||||
}
|
||||
|
||||
if let Some(lsn) = ancestor_start_lsn.as_mut() {
|
||||
*lsn = lsn.align();
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ use itertools::Itertools;
|
||||
use super::storage_layer::LayerName;
|
||||
|
||||
/// Checks whether a layer map is valid (i.e., is a valid result of the current compaction algorithm if nothing goes wrong).
|
||||
///
|
||||
/// The function checks if we can split the LSN range of a delta layer only at the LSNs of the delta layers. For example,
|
||||
///
|
||||
/// ```plain
|
||||
|
||||
@@ -1470,4 +1470,52 @@ mod tests {
|
||||
LayerVisibilityHint::Visible
|
||||
));
|
||||
}
|
||||
|
||||
/// Exercise edge case of querying at exactly the LSN of an image layer
|
||||
#[test]
|
||||
fn layer_search_at_image_lsn() {
|
||||
let tenant_id = TenantId::generate();
|
||||
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
|
||||
let timeline_id = TimelineId::generate();
|
||||
|
||||
let last_record_lsn = Lsn::from_hex("00000000DEADBEEF").unwrap();
|
||||
|
||||
let mut layer_map = LayerMap::default();
|
||||
let mut updates = layer_map.batch_update();
|
||||
|
||||
let image_layer = PersistentLayerDesc {
|
||||
key_range: Key::from_i128(0)..Key::from_i128(i128::MAX),
|
||||
lsn_range: PersistentLayerDesc::image_layer_lsn_range(last_record_lsn),
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
is_delta: false,
|
||||
file_size: 123,
|
||||
};
|
||||
|
||||
let delta_layer = PersistentLayerDesc {
|
||||
key_range: Key::from_i128(0)..Key::from_i128(i128::MAX),
|
||||
lsn_range: Lsn(0)..Lsn(0xdead0000),
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
is_delta: true,
|
||||
file_size: 123,
|
||||
};
|
||||
|
||||
updates.insert_historic(image_layer.clone());
|
||||
updates.insert_historic(delta_layer);
|
||||
|
||||
updates.flush();
|
||||
|
||||
// FIXME: according to the search() docstring, it searches for layers with start LSNs _less then_
|
||||
// `end_lsn` -- i.e. it's correct that if you ask for exactly the LSN of an image layer, it shouldn't hit
|
||||
// it. However, the way that page_service calls it is to take the last_record_lsn of a Timeline
|
||||
// and pass that directly into LayerMap::search().
|
||||
|
||||
let searched = layer_map
|
||||
.search(Key::from_i128(12345), last_record_lsn)
|
||||
.unwrap();
|
||||
|
||||
// We searched at the LSN of the image layer: we should hit it
|
||||
assert_eq!(searched.layer.as_ref(), &image_layer);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -433,6 +433,7 @@ impl ReadableLayer {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
tracing::info!("get_values_reconstruct_data: {:?}", self.id());
|
||||
match self {
|
||||
ReadableLayer::PersistentLayer(layer) => {
|
||||
layer
|
||||
|
||||
@@ -3601,7 +3601,7 @@ impl Timeline {
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| FlushLayerError::from_anyhow(self, e))?;
|
||||
.map_err(|e| FlushLayerError::from_anyhow(self, e.into()))?;
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(FlushLayerError::Cancelled);
|
||||
@@ -3840,33 +3840,37 @@ impl Timeline {
|
||||
partition_size: u64,
|
||||
flags: EnumSet<CompactFlags>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<((KeyPartitioning, SparseKeyPartitioning), Lsn)> {
|
||||
) -> Result<((KeyPartitioning, SparseKeyPartitioning), Lsn), CompactionError> {
|
||||
let Ok(mut partitioning_guard) = self.partitioning.try_lock() else {
|
||||
// NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timeline.
|
||||
// The other is the initdb optimization in flush_frozen_layer, used by `boostrap_timeline`, which runs before `.activate()`
|
||||
// and hence before the compaction task starts.
|
||||
anyhow::bail!("repartition() called concurrently, this should not happen");
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"repartition() called concurrently, this should not happen"
|
||||
)));
|
||||
};
|
||||
let ((dense_partition, sparse_partition), partition_lsn) = &*partitioning_guard;
|
||||
if lsn < *partition_lsn {
|
||||
anyhow::bail!("repartition() called with LSN going backwards, this should not happen");
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"repartition() called with LSN going backwards, this should not happen"
|
||||
)));
|
||||
}
|
||||
|
||||
let distance = lsn.0 - partition_lsn.0;
|
||||
if *partition_lsn != Lsn(0)
|
||||
&& distance <= self.repartition_threshold
|
||||
&& !flags.contains(CompactFlags::ForceRepartition)
|
||||
{
|
||||
debug!(
|
||||
distance,
|
||||
threshold = self.repartition_threshold,
|
||||
"no repartitioning needed"
|
||||
);
|
||||
return Ok((
|
||||
(dense_partition.clone(), sparse_partition.clone()),
|
||||
*partition_lsn,
|
||||
));
|
||||
}
|
||||
// let distance = lsn.0 - partition_lsn.0;
|
||||
// if *partition_lsn != Lsn(0)
|
||||
// && distance <= self.repartition_threshold
|
||||
// && !flags.contains(CompactFlags::ForceRepartition)
|
||||
// {
|
||||
// debug!(
|
||||
// distance,
|
||||
// threshold = self.repartition_threshold,
|
||||
// "no repartitioning needed"
|
||||
// );
|
||||
// return Ok((
|
||||
// (dense_partition.clone(), sparse_partition.clone()),
|
||||
// *partition_lsn,
|
||||
// ));
|
||||
// }
|
||||
|
||||
let (dense_ks, sparse_ks) = self.collect_keyspace(lsn, ctx).await?;
|
||||
let dense_partitioning = dense_ks.partition(&self.shard_identity, partition_size);
|
||||
@@ -4451,6 +4455,12 @@ pub(crate) enum CompactionError {
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
impl CompactionError {
|
||||
pub fn is_cancelled(&self) -> bool {
|
||||
matches!(self, CompactionError::ShuttingDown)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CollectKeySpaceError> for CompactionError {
|
||||
fn from(err: CollectKeySpaceError) -> Self {
|
||||
match err {
|
||||
@@ -5769,6 +5779,7 @@ impl<'a> TimelineWriter<'a> {
|
||||
/// the 'lsn' or anything older. The previous last record LSN is stored alongside
|
||||
/// the latest and can be read.
|
||||
pub(crate) fn finish_write(&self, new_lsn: Lsn) {
|
||||
tracing::info!("finish_write @ {new_lsn}");
|
||||
self.tl.finish_write(new_lsn);
|
||||
}
|
||||
|
||||
|
||||
@@ -364,6 +364,10 @@ impl Timeline {
|
||||
// 3. Create new image layers for partitions that have been modified
|
||||
// "enough". Skip image layer creation if L0 compaction cannot keep up.
|
||||
if fully_compacted {
|
||||
tracing::info!(
|
||||
"create_image_layers @ {lsn} (latest {})",
|
||||
self.get_last_record_lsn()
|
||||
);
|
||||
let image_layers = self
|
||||
.create_image_layers(
|
||||
&partitioning,
|
||||
@@ -390,7 +394,7 @@ impl Timeline {
|
||||
// error but continue.
|
||||
//
|
||||
// Suppress error when it's due to cancellation
|
||||
if !self.cancel.is_cancelled() {
|
||||
if !self.cancel.is_cancelled() && !err.is_cancelled() {
|
||||
tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
|
||||
}
|
||||
(1, false)
|
||||
|
||||
@@ -25,7 +25,18 @@ SHLIB_LINK_INTERNAL = $(libpq)
|
||||
SHLIB_LINK = -lcurl
|
||||
|
||||
EXTENSION = neon
|
||||
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql neon--1.3--1.4.sql neon--1.4--1.3.sql neon--1.4--1.5.sql neon--1.5--1.4.sql
|
||||
DATA = \
|
||||
neon--1.0.sql \
|
||||
neon--1.0--1.1.sql \
|
||||
neon--1.1--1.2.sql \
|
||||
neon--1.2--1.3.sql \
|
||||
neon--1.3--1.4.sql \
|
||||
neon--1.4--1.5.sql \
|
||||
neon--1.5--1.4.sql \
|
||||
neon--1.4--1.3.sql \
|
||||
neon--1.3--1.2.sql \
|
||||
neon--1.2--1.1.sql \
|
||||
neon--1.1--1.0.sql
|
||||
PGFILEDESC = "neon - cloud storage for PostgreSQL"
|
||||
|
||||
EXTRA_CLEAN = \
|
||||
|
||||
@@ -642,9 +642,6 @@ class NeonEnvBuilder:
|
||||
patch_script = ""
|
||||
for ps in self.env.pageservers:
|
||||
patch_script += f"UPDATE nodes SET listen_http_port={ps.service_port.http}, listen_pg_port={ps.service_port.pg} WHERE node_id = '{ps.id}';"
|
||||
# This is a temporary to get the backward compat test happy
|
||||
# since the compat snapshot was generated with an older version of neon local
|
||||
patch_script += f"UPDATE nodes SET availability_zone_id='{ps.az_id}' WHERE node_id = '{ps.id}' AND availability_zone_id IS NULL;"
|
||||
patch_script_path.write_text(patch_script)
|
||||
|
||||
# Update the config with info about tenants and timelines
|
||||
|
||||
@@ -56,20 +56,32 @@ class Workload:
|
||||
with ENDPOINT_LOCK:
|
||||
self._endpoint.reconfigure()
|
||||
|
||||
def endpoint(self, pageserver_id: Optional[int] = None) -> Endpoint:
|
||||
def go_readonly(self):
|
||||
self.stop()
|
||||
self._endpoint = self.make_endpoint(readonly=True, pageserver_id=None)
|
||||
self._endpoint.start(pageserver_id=None)
|
||||
|
||||
def make_endpoint(self, readonly: bool, pageserver_id: Optional[int] = None) -> Endpoint:
|
||||
# We may be running alongside other Workloads for different tenants. Full TTID is
|
||||
# obnoxiously long for use here, but a cut-down version is still unique enough for tests.
|
||||
endpoint_id = f"ep-workload-{str(self.tenant_id)[0:4]}-{str(self.timeline_id)[0:4]}"
|
||||
|
||||
if readonly:
|
||||
self._endpoint_opts["hot_standby"] = True
|
||||
|
||||
return self.env.endpoints.create(
|
||||
self.branch_name,
|
||||
tenant_id=self.tenant_id,
|
||||
pageserver_id=pageserver_id,
|
||||
endpoint_id=endpoint_id,
|
||||
**self._endpoint_opts,
|
||||
)
|
||||
|
||||
def endpoint(self, pageserver_id: Optional[int] = None) -> Endpoint:
|
||||
with ENDPOINT_LOCK:
|
||||
if self._endpoint is None:
|
||||
self._endpoint = self.env.endpoints.create(
|
||||
self.branch_name,
|
||||
tenant_id=self.tenant_id,
|
||||
pageserver_id=pageserver_id,
|
||||
endpoint_id=endpoint_id,
|
||||
**self._endpoint_opts,
|
||||
)
|
||||
self._endpoint = self.make_endpoint(pageserver_id=pageserver_id, readonly=False)
|
||||
|
||||
self._endpoint.start(pageserver_id=pageserver_id)
|
||||
else:
|
||||
self._endpoint.reconfigure(pageserver_id=pageserver_id)
|
||||
|
||||
@@ -11,6 +11,7 @@ from fixtures.neon_fixtures import (
|
||||
generate_uploads_and_deletions,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn
|
||||
from fixtures.utils import wait_until
|
||||
from fixtures.workload import Workload
|
||||
|
||||
@@ -412,3 +413,42 @@ def test_image_layer_compression(neon_env_builder: NeonEnvBuilder, enabled: bool
|
||||
f"SELECT count(*) FROM foo WHERE id={v} and val=repeat('abcde{v:0>3}', 500)"
|
||||
)
|
||||
assert res[0][0] == 1
|
||||
|
||||
|
||||
def test_image_layer_reads(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
env.pageserver.http_client().set_tenant_config(
|
||||
tenant_id,
|
||||
{
|
||||
"compaction_period": "0s",
|
||||
},
|
||||
)
|
||||
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init()
|
||||
workload.write_rows(256)
|
||||
workload.validate()
|
||||
|
||||
workload.go_readonly()
|
||||
|
||||
commit_lsn = env.safekeepers[0].http_client().get_commit_lsn(tenant_id, timeline_id)
|
||||
wait_for_last_record_lsn(env.pageserver.http_client(), tenant_id, timeline_id, commit_lsn)
|
||||
log.info(f"Ingested up to commit_lsn {commit_lsn}")
|
||||
|
||||
env.pageserver.http_client().timeline_compact(
|
||||
tenant_id, timeline_id, force_image_layer_creation=True
|
||||
)
|
||||
|
||||
# Uncomment this checkpoint, and the logs will show getpage requests hitting the image layers we
|
||||
# just created. However, without the checkpoint, getpage requests will hit one InMemoryLayer and
|
||||
# one persistent delta layer.
|
||||
# env.pageserver.http_client().timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True)
|
||||
|
||||
# This should send getpage requests at the same LSN where we just created image layers
|
||||
workload.validate()
|
||||
|
||||
# Nothing should have written in the meantime
|
||||
assert commit_lsn == env.safekeepers[0].http_client().get_commit_lsn(tenant_id, timeline_id)
|
||||
|
||||
Reference in New Issue
Block a user