mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 05:30:37 +00:00
Compare commits
3 Commits
release-pr
...
jcsp/slrus
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b6b1a577af | ||
|
|
7262096b74 | ||
|
|
6315b7b688 |
@@ -197,6 +197,7 @@ use utils::backoff::{
|
||||
self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
|
||||
};
|
||||
use utils::pausable_failpoint;
|
||||
use utils::shard::ShardNumber;
|
||||
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
@@ -2231,6 +2232,28 @@ impl RemoteTimelineClient {
|
||||
UploadQueue::Initialized(x) => x.no_pending_work(),
|
||||
}
|
||||
}
|
||||
|
||||
/// 'foreign' in the sense that it does not belong to this tenant shard. This method
|
||||
/// is used during GC for other shards to get the index of shard zero.
|
||||
pub(crate) async fn download_foreign_index(
|
||||
&self,
|
||||
shard_number: ShardNumber,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(IndexPart, Generation, std::time::SystemTime), DownloadError> {
|
||||
let foreign_shard_id = TenantShardId {
|
||||
shard_number,
|
||||
shard_count: self.tenant_shard_id.shard_count,
|
||||
tenant_id: self.tenant_shard_id.tenant_id,
|
||||
};
|
||||
download_index_part(
|
||||
&self.storage_impl,
|
||||
&foreign_shard_id,
|
||||
&self.timeline_id,
|
||||
Generation::MAX,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct UploadQueueAccessor<'a> {
|
||||
|
||||
@@ -38,6 +38,7 @@ use pageserver_api::{
|
||||
shard::{ShardIdentity, ShardNumber, TenantShardId},
|
||||
};
|
||||
use rand::Rng;
|
||||
use remote_storage::DownloadError;
|
||||
use serde_with::serde_as;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tokio::{
|
||||
@@ -4774,6 +4775,86 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn find_gc_time_cutoff(
|
||||
&self,
|
||||
pitr: Duration,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Option<Lsn>, PageReconstructError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
if self.shard_identity.is_shard_zero() {
|
||||
// Shard Zero has SLRU data and can calculate the PITR time -> LSN mapping itself
|
||||
let now = SystemTime::now();
|
||||
let time_range = if pitr == Duration::ZERO {
|
||||
humantime::parse_duration(DEFAULT_PITR_INTERVAL).expect("constant is invalid")
|
||||
} else {
|
||||
pitr
|
||||
};
|
||||
|
||||
// If PITR is so large or `now` is so small that this underflows, we will retain no history (highly unexpected case)
|
||||
let time_cutoff = now.checked_sub(time_range).unwrap_or(now);
|
||||
let timestamp = to_pg_timestamp(time_cutoff);
|
||||
|
||||
let time_cutoff = match self.find_lsn_for_timestamp(timestamp, cancel, ctx).await? {
|
||||
LsnForTimestamp::Present(lsn) => Some(lsn),
|
||||
LsnForTimestamp::Future(lsn) => {
|
||||
// The timestamp is in the future. That sounds impossible,
|
||||
// but what it really means is that there hasn't been
|
||||
// any commits since the cutoff timestamp.
|
||||
//
|
||||
// In this case we should use the LSN of the most recent commit,
|
||||
// which is implicitly the last LSN in the log.
|
||||
debug!("future({})", lsn);
|
||||
Some(self.get_last_record_lsn())
|
||||
}
|
||||
LsnForTimestamp::Past(lsn) => {
|
||||
debug!("past({})", lsn);
|
||||
None
|
||||
}
|
||||
LsnForTimestamp::NoData(lsn) => {
|
||||
debug!("nodata({})", lsn);
|
||||
None
|
||||
}
|
||||
};
|
||||
Ok(time_cutoff)
|
||||
} else {
|
||||
// Shards other than shard zero cannot do timestamp->lsn lookups, and must instead learn their GC cutoff
|
||||
// from shard zero's index. The index doesn't explicitly tell us the time cutoff, but we may assume that
|
||||
// the point up to which shard zero's last_gc_cutoff has advanced will either be the time cutoff, or a
|
||||
// space cutoff that we would also have respected ourselves.
|
||||
match self
|
||||
.remote_client
|
||||
.download_foreign_index(ShardNumber(0), cancel)
|
||||
.await
|
||||
{
|
||||
Ok((index_part, index_generation, _index_mtime)) => {
|
||||
tracing::info!("GC loaded shard zero metadata (gen {index_generation:?}): latest_gc_cutoff_lsn: {}",
|
||||
index_part.metadata.latest_gc_cutoff_lsn());
|
||||
Ok(Some(index_part.metadata.latest_gc_cutoff_lsn()))
|
||||
}
|
||||
Err(DownloadError::NotFound) => {
|
||||
// This is unexpected, because during timeline creations shard zero persists to remote
|
||||
// storage before other shards are called, and during timeline deletion non-zeroth shards are
|
||||
// deleted before the zeroth one. However, it should be harmless: if we somehow end up in this
|
||||
// state, then shard zero should _eventually_ write an index when it GCs.
|
||||
tracing::warn!("GC couldn't find shard zero's index for timeline");
|
||||
Ok(None)
|
||||
}
|
||||
Err(e) => {
|
||||
// TODO: this function should return a different error type than page reconstruct error
|
||||
Err(PageReconstructError::Other(anyhow::anyhow!(e)))
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: after reading shard zero's GC cutoff, we should validate its generation with the storage
|
||||
// controller. Otherwise, it is possible that we see the GC cutoff go backwards while shard zero
|
||||
// is going through a migration if we read the old location's index and it has GC'd ahead of the
|
||||
// new location. This is legal in principle, but problematic in practice because it might result
|
||||
// in a timeline creation succeeding on shard zero ('s new location) but then failing on other shards
|
||||
// because they have GC'd past the branch point.
|
||||
}
|
||||
}
|
||||
|
||||
/// Find the Lsns above which layer files need to be retained on
|
||||
/// garbage collection.
|
||||
///
|
||||
@@ -4816,40 +4897,7 @@ impl Timeline {
|
||||
// - if PITR interval is set, then this is our cutoff.
|
||||
// - if PITR interval is not set, then we do a lookup
|
||||
// based on DEFAULT_PITR_INTERVAL, so that size-based retention does not result in keeping history around permanently on idle databases.
|
||||
let time_cutoff = {
|
||||
let now = SystemTime::now();
|
||||
let time_range = if pitr == Duration::ZERO {
|
||||
humantime::parse_duration(DEFAULT_PITR_INTERVAL).expect("constant is invalid")
|
||||
} else {
|
||||
pitr
|
||||
};
|
||||
|
||||
// If PITR is so large or `now` is so small that this underflows, we will retain no history (highly unexpected case)
|
||||
let time_cutoff = now.checked_sub(time_range).unwrap_or(now);
|
||||
let timestamp = to_pg_timestamp(time_cutoff);
|
||||
|
||||
match self.find_lsn_for_timestamp(timestamp, cancel, ctx).await? {
|
||||
LsnForTimestamp::Present(lsn) => Some(lsn),
|
||||
LsnForTimestamp::Future(lsn) => {
|
||||
// The timestamp is in the future. That sounds impossible,
|
||||
// but what it really means is that there hasn't been
|
||||
// any commits since the cutoff timestamp.
|
||||
//
|
||||
// In this case we should use the LSN of the most recent commit,
|
||||
// which is implicitly the last LSN in the log.
|
||||
debug!("future({})", lsn);
|
||||
Some(self.get_last_record_lsn())
|
||||
}
|
||||
LsnForTimestamp::Past(lsn) => {
|
||||
debug!("past({})", lsn);
|
||||
None
|
||||
}
|
||||
LsnForTimestamp::NoData(lsn) => {
|
||||
debug!("nodata({})", lsn);
|
||||
None
|
||||
}
|
||||
}
|
||||
};
|
||||
let time_cutoff = self.find_gc_time_cutoff(pitr, cancel, ctx).await?;
|
||||
|
||||
Ok(match (pitr, time_cutoff) {
|
||||
(Duration::ZERO, Some(time_cutoff)) => {
|
||||
|
||||
@@ -77,14 +77,16 @@ class MockS3Server:
|
||||
class LocalFsStorage:
|
||||
root: Path
|
||||
|
||||
def tenant_path(self, tenant_id: TenantId) -> Path:
|
||||
def tenant_path(self, tenant_id: Union[TenantId, TenantShardId]) -> Path:
|
||||
return self.root / "tenants" / str(tenant_id)
|
||||
|
||||
def timeline_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
|
||||
def timeline_path(
|
||||
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
|
||||
) -> Path:
|
||||
return self.tenant_path(tenant_id) / "timelines" / str(timeline_id)
|
||||
|
||||
def timeline_latest_generation(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId
|
||||
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
|
||||
) -> Optional[int]:
|
||||
timeline_files = os.listdir(self.timeline_path(tenant_id, timeline_id))
|
||||
index_parts = [f for f in timeline_files if f.startswith("index_part")]
|
||||
@@ -102,7 +104,9 @@ class LocalFsStorage:
|
||||
raise RuntimeError(f"No index_part found for {tenant_id}/{timeline_id}")
|
||||
return generations[-1]
|
||||
|
||||
def index_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
|
||||
def index_path(
|
||||
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
|
||||
) -> Path:
|
||||
latest_gen = self.timeline_latest_generation(tenant_id, timeline_id)
|
||||
if latest_gen is None:
|
||||
filename = TIMELINE_INDEX_PART_FILE_NAME
|
||||
@@ -126,7 +130,9 @@ class LocalFsStorage:
|
||||
filename = f"{local_name}-{generation:08x}"
|
||||
return self.timeline_path(tenant_id, timeline_id) / filename
|
||||
|
||||
def index_content(self, tenant_id: TenantId, timeline_id: TimelineId) -> Any:
|
||||
def index_content(
|
||||
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
|
||||
) -> Any:
|
||||
with self.index_path(tenant_id, timeline_id).open("r") as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
@@ -110,13 +110,15 @@ def post_checks(env: NeonEnv, test_output_dir: Path, db_name: str, endpoint: End
|
||||
|
||||
check_restored_datadir_content(test_output_dir, env, endpoint, ignored_files=ignored_files)
|
||||
|
||||
# Ensure that compaction works, on a timeline containing all the diversity that postgres regression tests create.
|
||||
# Ensure that compaction/GC works, on a timeline containing all the diversity that postgres regression tests create.
|
||||
# There should have been compactions mid-test as well, this final check is in addition those.
|
||||
for shard, pageserver in tenant_get_shards(env, env.initial_tenant):
|
||||
pageserver.http_client().timeline_checkpoint(
|
||||
shard, env.initial_timeline, force_repartition=True, force_image_layer_creation=True
|
||||
)
|
||||
|
||||
pageserver.http_client().timeline_gc(shard, env.initial_timeline, None)
|
||||
|
||||
|
||||
# Run the main PostgreSQL regression tests, in src/test/regress.
|
||||
#
|
||||
|
||||
@@ -19,7 +19,7 @@ from fixtures.neon_fixtures import (
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pageserver.utils import assert_prefix_empty, assert_prefix_not_empty
|
||||
from fixtures.remote_storage import s3_storage
|
||||
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind, s3_storage
|
||||
from fixtures.utils import skip_in_debug_build, wait_until
|
||||
from fixtures.workload import Workload
|
||||
from pytest_httpserver import HTTPServer
|
||||
@@ -1685,3 +1685,111 @@ def test_top_tenants(neon_env_builder: NeonEnvBuilder):
|
||||
)
|
||||
assert len(top["shards"]) == n_tenants - 4
|
||||
assert set(i["id"] for i in top["shards"]) == set(str(i[0]) for i in tenants[4:])
|
||||
|
||||
|
||||
def test_sharding_gc(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
Exercise GC in a sharded tenant: because only shard 0 holds SLRU content, it acts as
|
||||
the "leader" for GC, and other shards read its index to learn what LSN they should
|
||||
GC up to.
|
||||
"""
|
||||
|
||||
shard_count = 4
|
||||
neon_env_builder.num_pageservers = shard_count
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
TENANT_CONF = {
|
||||
# small checkpointing and compaction targets to ensure we generate many upload operations
|
||||
"checkpoint_distance": 128 * 1024,
|
||||
"compaction_threshold": 1,
|
||||
"compaction_target_size": 128 * 1024,
|
||||
# A short PITR horizon, so that we won't have to sleep too long in the test to wait for it to
|
||||
# happen.
|
||||
"pitr_interval": "1s",
|
||||
# disable background compaction and GC. We invoke it manually when we want it to happen.
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
# Disable automatic creation of image layers, as we will create them explicitly when we want them
|
||||
"image_creation_threshold": 9999,
|
||||
"image_layer_creation_check_threshold": 0,
|
||||
"lsn_lease_length": "0s",
|
||||
}
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_shard_count=shard_count, initial_tenant_conf=TENANT_CONF
|
||||
)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
# Create a branch and write some data
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
initial_lsn = Lsn(workload.endpoint().safe_psql("SELECT pg_current_wal_lsn()")[0][0])
|
||||
log.info(f"Started at LSN: {initial_lsn}")
|
||||
|
||||
workload.init()
|
||||
|
||||
# Write enough data to generate multiple layers
|
||||
for _i in range(10):
|
||||
last_lsn = workload.write_rows(32)
|
||||
|
||||
assert last_lsn > initial_lsn
|
||||
|
||||
log.info(f"Wrote up to last LSN: {last_lsn}")
|
||||
|
||||
# Do full image layer generation. When we subsequently wait for PITR, all historic deltas
|
||||
# should be GC-able
|
||||
for shard_number in range(shard_count):
|
||||
shard = TenantShardId(tenant_id, shard_number, shard_count)
|
||||
env.get_tenant_pageserver(shard).http_client().timeline_compact(
|
||||
shard, timeline_id, force_image_layer_creation=True
|
||||
)
|
||||
|
||||
workload.churn_rows(32)
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
# Invoke GC on a non-zero shard and verify its GC cutoff LSN does not advance
|
||||
shard_one = TenantShardId(tenant_id, 1, shard_count)
|
||||
env.get_tenant_pageserver(shard_one).http_client().timeline_gc(
|
||||
shard_one, timeline_id, gc_horizon=None
|
||||
)
|
||||
|
||||
# Check shard 1's index - GC cutoff LSN should not have advanced
|
||||
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
|
||||
shard_1_index = env.pageserver_remote_storage.index_content(
|
||||
tenant_id=shard_one, timeline_id=timeline_id
|
||||
)
|
||||
shard_1_gc_cutoff_lsn = Lsn(shard_1_index["metadata_bytes"]["latest_gc_cutoff_lsn"])
|
||||
log.info(f"Shard 1 cutoff LSN: {shard_1_gc_cutoff_lsn}")
|
||||
assert shard_1_gc_cutoff_lsn <= last_lsn
|
||||
|
||||
shard_zero = TenantShardId(tenant_id, 0, shard_count)
|
||||
env.get_tenant_pageserver(shard_zero).http_client().timeline_gc(
|
||||
shard_zero, timeline_id, gc_horizon=None
|
||||
)
|
||||
|
||||
# TODO: observe that GC LSN of shard 0 has moved forward in remote storage
|
||||
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
|
||||
shard_0_index = env.pageserver_remote_storage.index_content(
|
||||
tenant_id=shard_zero, timeline_id=timeline_id
|
||||
)
|
||||
shard_0_gc_cutoff_lsn = Lsn(shard_0_index["metadata_bytes"]["latest_gc_cutoff_lsn"])
|
||||
log.info(f"Shard 0 cutoff LSN: {shard_0_gc_cutoff_lsn}")
|
||||
assert shard_0_gc_cutoff_lsn >= last_lsn
|
||||
|
||||
# Invoke GC on all other shards and verify their GC cutoff LSNs
|
||||
for shard_number in range(1, shard_count):
|
||||
shard = TenantShardId(tenant_id, shard_number, shard_count)
|
||||
env.get_tenant_pageserver(shard).http_client().timeline_gc(
|
||||
shard, timeline_id, gc_horizon=None
|
||||
)
|
||||
|
||||
# Verify GC cutoff LSN advanced to match shard 0
|
||||
shard_index = env.pageserver_remote_storage.index_content(
|
||||
tenant_id=shard, timeline_id=timeline_id
|
||||
)
|
||||
shard_gc_cutoff_lsn = Lsn(shard_index["metadata_bytes"]["latest_gc_cutoff_lsn"])
|
||||
log.info(f"Shard {shard_number} cutoff LSN: {shard_gc_cutoff_lsn}")
|
||||
assert shard_gc_cutoff_lsn == shard_0_gc_cutoff_lsn
|
||||
|
||||
Reference in New Issue
Block a user