Compare commits

..

1 Commits

Author SHA1 Message Date
John Spray
f75691533d storcon: avoid multiple initdbs when shard 0 has stale locations 2025-04-28 19:42:19 +02:00
17 changed files with 82 additions and 518 deletions

View File

@@ -3,5 +3,3 @@ pg_distrib_dir='/usr/local/'
listen_pg_addr='0.0.0.0:6400'
listen_http_addr='0.0.0.0:9898'
remote_storage={ endpoint='http://minio:9000', bucket_name='neon', bucket_region='eu-north-1', prefix_in_bucket='/pageserver' }
control_plane_api='http://0.0.0.0:6666' # No storage controller in docker compose, specify a junk address
control_plane_emergency_mode=true

View File

@@ -504,7 +504,7 @@ fn start_pageserver(
// Set up deletion queue
let (deletion_queue, deletion_workers) = DeletionQueue::new(
remote_storage.clone(),
StorageControllerUpcallClient::new(conf, &shutdown_pageserver),
StorageControllerUpcallClient::new(conf, &shutdown_pageserver)?,
conf,
);
deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle());

View File

@@ -150,7 +150,7 @@ pub struct PageServerConf {
/// not terrible.
pub background_task_maximum_delay: Duration,
pub control_plane_api: Url,
pub control_plane_api: Option<Url>,
/// JWT token for use with the control plane API.
pub control_plane_api_token: Option<SecretString>,
@@ -438,8 +438,7 @@ impl PageServerConf {
test_remote_failures,
ondemand_download_behavior_treat_error_as_warn,
background_task_maximum_delay,
control_plane_api: control_plane_api
.ok_or_else(|| anyhow::anyhow!("`control_plane_api` must be set"))?,
control_plane_api,
control_plane_emergency_mode,
heatmap_upload_concurrency,
secondary_download_concurrency,
@@ -574,7 +573,6 @@ impl PageServerConf {
background_task_maximum_delay: Duration::ZERO,
load_previous_heatmap: Some(true),
generate_unarchival_heatmap: Some(true),
control_plane_api: Some(Url::parse("http://localhost:6666").unwrap()),
..Default::default()
};
PageServerConf::parse_and_validate(NodeId(0), config_toml, &repo_dir).unwrap()
@@ -643,12 +641,9 @@ mod tests {
use super::PageServerConf;
#[test]
fn test_minimal_config_toml_is_valid() {
// The minimal valid config for running a pageserver:
// - control_plane_api is mandatory, as pageservers cannot run in isolation
// - we use Default impl of everything else in this situation
fn test_empty_config_toml_is_valid() {
// we use Default impl of everything in this situation
let input = r#"
control_plane_api = "http://localhost:6666"
"#;
let config_toml = toml_edit::de::from_str::<pageserver_api::config::ConfigToml>(input)
.expect("empty config is valid");

View File

@@ -58,8 +58,14 @@ pub trait StorageControllerUpcallApi {
impl StorageControllerUpcallClient {
/// A None return value indicates that the input `conf` object does not have control
/// plane API enabled.
pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Self {
let mut url = conf.control_plane_api.clone();
pub fn new(
conf: &'static PageServerConf,
cancel: &CancellationToken,
) -> Result<Option<Self>, reqwest::Error> {
let mut url = match conf.control_plane_api.as_ref() {
Some(u) => u.clone(),
None => return Ok(None),
};
if let Ok(mut segs) = url.path_segments_mut() {
// This ensures that `url` ends with a slash if it doesn't already.
@@ -79,17 +85,15 @@ impl StorageControllerUpcallClient {
}
for cert in &conf.ssl_ca_certs {
client = client.add_root_certificate(
Certificate::from_der(cert.contents()).expect("Invalid certificate in config"),
);
client = client.add_root_certificate(Certificate::from_der(cert.contents())?);
}
Self {
http_client: client.build().expect("Failed to construct HTTP client"),
Ok(Some(Self {
http_client: client.build()?,
base_url: url,
node_id: conf.id,
cancel: cancel.clone(),
}
}))
}
#[tracing::instrument(skip_all)]

View File

@@ -585,7 +585,7 @@ impl DeletionQueue {
/// we don't spawn those inside new() so that the caller can use their runtime/spans of choice.
pub fn new<C>(
remote_storage: GenericRemoteStorage,
controller_upcall_client: C,
controller_upcall_client: Option<C>,
conf: &'static PageServerConf,
) -> (Self, DeletionQueueWorkers<C>)
where
@@ -701,7 +701,7 @@ mod test {
async fn restart(&mut self) {
let (deletion_queue, workers) = DeletionQueue::new(
self.storage.clone(),
self.mock_control_plane.clone(),
Some(self.mock_control_plane.clone()),
self.harness.conf,
);
@@ -821,8 +821,11 @@ mod test {
let mock_control_plane = MockStorageController::new();
let (deletion_queue, worker) =
DeletionQueue::new(storage.clone(), mock_control_plane.clone(), harness.conf);
let (deletion_queue, worker) = DeletionQueue::new(
storage.clone(),
Some(mock_control_plane.clone()),
harness.conf,
);
let worker_join = worker.spawn_with(&tokio::runtime::Handle::current());

View File

@@ -53,7 +53,7 @@ where
tx: tokio::sync::mpsc::Sender<DeleterMessage>,
// Client for calling into control plane API for validation of deletes
controller_upcall_client: C,
controller_upcall_client: Option<C>,
// DeletionLists which are waiting generation validation. Not safe to
// execute until [`validate`] has processed them.
@@ -86,7 +86,7 @@ where
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::Receiver<ValidatorQueueMessage>,
tx: tokio::sync::mpsc::Sender<DeleterMessage>,
controller_upcall_client: C,
controller_upcall_client: Option<C>,
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
cancel: CancellationToken,
) -> Self {
@@ -137,16 +137,20 @@ where
return Ok(());
}
let tenants_valid = match self
.controller_upcall_client
.validate(tenant_generations.iter().map(|(k, v)| (*k, *v)).collect())
.await
{
Ok(tenants) => tenants,
Err(RetryForeverError::ShuttingDown) => {
// The only way a validation call returns an error is when the cancellation token fires
return Err(DeletionQueueError::ShuttingDown);
let tenants_valid = if let Some(controller_upcall_client) = &self.controller_upcall_client {
match controller_upcall_client
.validate(tenant_generations.iter().map(|(k, v)| (*k, *v)).collect())
.await
{
Ok(tenants) => tenants,
Err(RetryForeverError::ShuttingDown) => {
// The only way a validation call returns an error is when the cancellation token fires
return Err(DeletionQueueError::ShuttingDown);
}
}
} else {
// Control plane API disabled. In legacy mode we consider everything valid.
tenant_generations.keys().map(|k| (*k, true)).collect()
};
let mut validated_sequence: Option<u64> = None;

View File

@@ -1084,17 +1084,8 @@ impl Timeline {
let mut result = HashMap::new();
for (k, v) in kv {
let v = v?;
if v.is_empty() {
// This is a tombstone -- we can skip it.
// Originally, the replorigin code uses `Lsn::INVALID` to represent a tombstone. However, as it part of
// the sparse keyspace and the sparse keyspace uses an empty image to universally represent a tombstone,
// we also need to consider that. Such tombstones might be written on the detach ancestor code path to
// avoid the value going into the child branch. (See [`crate::tenant::timeline::detach_ancestor::generate_tombstone_image_layer`] for more details.)
continue;
}
let origin_id = k.field6 as RepOriginId;
let origin_lsn = Lsn::des(&v)
.with_context(|| format!("decode replorigin value for {}: {v:?}", origin_id))?;
let origin_lsn = Lsn::des(&v).unwrap();
if origin_lsn != Lsn::INVALID {
result.insert(origin_id, origin_lsn);
}
@@ -2587,11 +2578,6 @@ impl DatadirModification<'_> {
}
}
#[cfg(test)]
pub fn put_for_unit_test(&mut self, key: Key, val: Value) {
self.put(key, val);
}
fn put(&mut self, key: Key, val: Value) {
if Self::is_data_key(&key) {
self.put_data(key.to_compact(), val)

View File

@@ -4254,7 +4254,9 @@ impl TenantShard {
deletion_queue_client: DeletionQueueClient,
l0_flush_global_state: L0FlushGlobalState,
) -> TenantShard {
assert!(!attached_conf.location.generation.is_none());
debug_assert!(
!attached_conf.location.generation.is_none() || conf.control_plane_api.is_none()
);
let (state, mut rx) = watch::channel(state);
@@ -5947,9 +5949,7 @@ mod tests {
use itertools::Itertools;
#[cfg(feature = "testing")]
use models::CompactLsnRange;
use pageserver_api::key::{
AUX_KEY_PREFIX, Key, NON_INHERITED_RANGE, RELATION_SIZE_PREFIX, repl_origin_key,
};
use pageserver_api::key::{AUX_KEY_PREFIX, Key, NON_INHERITED_RANGE, RELATION_SIZE_PREFIX};
use pageserver_api::keyspace::KeySpace;
#[cfg(feature = "testing")]
use pageserver_api::keyspace::KeySpaceRandomAccum;
@@ -8185,54 +8185,6 @@ mod tests {
assert_eq!(files.get("pg_logical/mappings/test2"), None);
}
#[tokio::test]
async fn test_repl_origin_tombstones() {
let harness = TenantHarness::create("test_repl_origin_tombstones")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let io_concurrency = IoConcurrency::spawn_for_test();
let mut lsn = Lsn(0x08);
let tline: Arc<Timeline> = tenant
.create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();
let repl_lsn = Lsn(0x10);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification.put_for_unit_test(repl_origin_key(2), Value::Image(Bytes::new()));
modification.set_replorigin(1, repl_lsn).await.unwrap();
modification.commit(&ctx).await.unwrap();
}
// we can read everything from the storage
let repl_origins = tline
.get_replorigins(lsn, &ctx, io_concurrency.clone())
.await
.unwrap();
assert_eq!(repl_origins.len(), 1);
assert_eq!(repl_origins[&1], lsn);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification.put_for_unit_test(
repl_origin_key(3),
Value::Image(Bytes::copy_from_slice(b"cannot_decode_this")),
);
modification.commit(&ctx).await.unwrap();
}
let result = tline
.get_replorigins(lsn, &ctx, io_concurrency.clone())
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_metadata_image_creation() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_metadata_image_creation").await?;

View File

@@ -346,8 +346,7 @@ async fn init_load_generations(
"Emergency mode! Tenants will be attached unsafely using their last known generation"
);
emergency_generations(tenant_confs)
} else {
let client = StorageControllerUpcallClient::new(conf, cancel);
} else if let Some(client) = StorageControllerUpcallClient::new(conf, cancel)? {
info!("Calling {} API to re-attach tenants", client.base_url());
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
match client.re_attach(conf).await {
@@ -361,6 +360,9 @@ async fn init_load_generations(
anyhow::bail!("Shut down while waiting for control plane re-attach response")
}
}
} else {
info!("Control plane API not configured, tenant generations are disabled");
return Ok(None);
};
// The deletion queue needs to know about the startup attachment state to decide which (if any) stored
@@ -1151,8 +1153,17 @@ impl TenantManager {
// Testing hack: if we are configured with no control plane, then drop the generation
// from upserts. This enables creating generation-less tenants even though neon_local
// always uses generations when calling the location conf API.
let attached_conf = AttachedTenantConf::try_from(new_location_config)
.map_err(UpsertLocationError::BadRequest)?;
let attached_conf = if cfg!(feature = "testing") {
let mut conf = AttachedTenantConf::try_from(new_location_config)
.map_err(UpsertLocationError::BadRequest)?;
if self.conf.control_plane_api.is_none() {
conf.location.generation = Generation::none();
}
conf
} else {
AttachedTenantConf::try_from(new_location_config)
.map_err(UpsertLocationError::BadRequest)?
};
let tenant = tenant_spawn(
self.conf,

View File

@@ -178,7 +178,7 @@ impl Attempt {
}
}
pub(crate) async fn generate_tombstone_image_layer(
async fn generate_tombstone_image_layer(
detached: &Arc<Timeline>,
ancestor: &Arc<Timeline>,
ancestor_lsn: Lsn,

View File

@@ -163,7 +163,8 @@ pub async fn doit(
// Ensure at-least-once delivery of the upcall to storage controller
// before we mark the task as done and never come here again.
//
let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel);
let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel)?
.expect("storcon configured");
storcon_client
.put_timeline_import_status(
timeline.tenant_shard_id,

View File

@@ -98,6 +98,7 @@
#define MB ((uint64)1024*1024)
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ >> lfc_chunk_size_log))
#define BLOCK_TO_CHUNK_OFF(blkno) ((blkno) & (lfc_blocks_per_chunk-1))
/*
@@ -134,15 +135,6 @@ typedef struct FileCacheEntry
#define N_COND_VARS 64
#define CV_WAIT_TIMEOUT 10
#define MAX_PREWARM_WORKERS 8
typedef struct PrewarmWorkerState
{
uint32 prewarmed_pages;
uint32 skipped_pages;
TimestampTz completed;
} PrewarmWorkerState;
typedef struct FileCacheControl
{
uint64 generation; /* generation is needed to handle correct hash
@@ -164,43 +156,25 @@ typedef struct FileCacheControl
dlist_head holes; /* double linked list of punched holes */
HyperLogLogState wss_estimation; /* estimation of working set size */
ConditionVariable cv[N_COND_VARS]; /* turnstile of condition variables */
PrewarmWorkerState prewarm_workers[MAX_PREWARM_WORKERS];
size_t n_prewarm_workers;
size_t n_prewarm_entries;
size_t total_prewarm_pages;
size_t prewarm_batch;
bool prewarm_active;
bool prewarm_canceled;
dsm_handle prewarm_lfc_state_handle;
} FileCacheControl;
#define FILE_CACHE_STATE_MAGIC 0xfcfcfcfc
#define FILE_CACHE_STATE_BITMAP(fcs) ((uint8*)&(fcs)->chunks[(fcs)->n_chunks])
#define FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_chunks) (sizeof(FileCacheState) + (n_chunks)*sizeof(BufferTag) + (((n_chunks) * lfc_blocks_per_chunk)+7)/8)
#define FILE_CACHE_STATE_SIZE(fcs) (sizeof(FileCacheState) + (fcs->n_chunks)*sizeof(BufferTag) + (((fcs->n_chunks) << fcs->chunk_size_log)+7)/8)
bool lfc_store_prefetch_result;
static HTAB *lfc_hash;
static int lfc_desc = -1;
static LWLockId lfc_lock;
static int lfc_max_size;
static int lfc_size_limit;
static int lfc_prewarm_limit;
static int lfc_prewarm_batch;
static int lfc_chunk_size_log = MAX_BLOCKS_PER_CHUNK_LOG;
static int lfc_blocks_per_chunk = MAX_BLOCKS_PER_CHUNK;
static char *lfc_path;
static uint64 lfc_generation;
static FileCacheControl *lfc_ctl;
static bool lfc_do_prewarm;
static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM>=150000
static shmem_request_hook_type prev_shmem_request_hook;
#endif
bool lfc_store_prefetch_result;
bool lfc_prewarm_update_ws_estimation;
#define LFC_ENABLED() (lfc_ctl->limit != 0)
/*
@@ -526,17 +500,6 @@ lfc_init(void)
NULL,
NULL);
DefineCustomBoolVariable("neon.prewarm_update_ws_estimation",
"Consider prewarmed pages for working set estimation",
NULL,
&lfc_prewarm_update_ws_estimation,
true,
PGC_SUSET,
0,
NULL,
NULL,
NULL);
DefineCustomIntVariable("neon.max_file_cache_size",
"Maximal size of Neon local file cache",
NULL,
@@ -587,32 +550,6 @@ lfc_init(void)
lfc_change_chunk_size,
NULL);
DefineCustomIntVariable("neon.file_cache_prewarm_limit",
"Maximal number of prewarmed chunks",
NULL,
&lfc_prewarm_limit,
INT_MAX, /* no limit by default */
0,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
DefineCustomIntVariable("neon.file_cache_prewarm_batch",
"Number of pages retrivied by prewarm from page server",
NULL,
&lfc_prewarm_batch,
64,
1,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
if (lfc_max_size == 0)
return;
@@ -626,314 +563,6 @@ lfc_init(void)
#endif
}
FileCacheState*
lfc_get_state(size_t max_entries)
{
FileCacheState* fcs = NULL;
if (lfc_maybe_disabled() || max_entries == 0) /* fast exit if file cache is disabled */
return NULL;
LWLockAcquire(lfc_lock, LW_SHARED);
if (LFC_ENABLED())
{
dlist_iter iter;
size_t i = 0;
uint8* bitmap;
size_t n_pages = 0;
size_t n_entries = Min(max_entries, lfc_ctl->used - lfc_ctl->pinned);
size_t state_size = FILE_CACHE_STATE_SIZE_FOR_CHUNKS(n_entries);
fcs = (FileCacheState*)palloc0(state_size);
SET_VARSIZE(fcs, state_size);
fcs->magic = FILE_CACHE_STATE_MAGIC;
fcs->chunk_size_log = lfc_chunk_size_log;
fcs->n_chunks = n_entries;
bitmap = FILE_CACHE_STATE_BITMAP(fcs);
dlist_reverse_foreach(iter, &lfc_ctl->lru)
{
FileCacheEntry *entry = dlist_container(FileCacheEntry, list_node, iter.cur);
fcs->chunks[i] = entry->key;
for (int j = 0; j < lfc_blocks_per_chunk; j++)
{
if (GET_STATE(entry, j) != UNAVAILABLE)
{
BITMAP_SET(bitmap, i*lfc_blocks_per_chunk + j);
n_pages += 1;
}
}
if (++i == n_entries)
break;
}
Assert(i == n_entries);
fcs->n_pages = n_pages;
Assert(pg_popcount((char*)bitmap, ((n_entries << lfc_chunk_size_log) + 7)/8) == n_pages);
elog(LOG, "LFC: save state of %d chunks %d pages", (int)n_entries, (int)n_pages);
}
LWLockRelease(lfc_lock);
return fcs;
}
/*
* Prewarm LFC cache to the specified state. It uses lfc_prefetch function to load prewarmed page without hoilding shared buffer lock
* and avoid race conditions with other backends.
*/
void
lfc_prewarm(FileCacheState* fcs, uint32 n_workers)
{
size_t fcs_chunk_size_log;
size_t n_entries;
size_t prewarm_batch = Min(lfc_prewarm_batch, readahead_buffer_size);
size_t fcs_size;
dsm_segment *seg;
BackgroundWorkerHandle* bgw_handle[MAX_PREWARM_WORKERS];
if (!lfc_ensure_opened())
return;
if (prewarm_batch == 0 || lfc_prewarm_limit == 0 || n_workers == 0)
{
elog(LOG, "LFC: prewarm is disabled");
return;
}
if (n_workers > MAX_PREWARM_WORKERS)
{
elog(ERROR, "LFC: Too much prewarm workers, maximum is %d", MAX_PREWARM_WORKERS);
}
if (fcs == NULL || fcs->n_chunks == 0)
{
elog(LOG, "LFC: nothing to prewarm");
return;
}
if (fcs->magic != FILE_CACHE_STATE_MAGIC)
{
elog(ERROR, "LFC: Invalid file cache state magic: %X", fcs->magic);
}
fcs_size = VARSIZE(fcs);
if (FILE_CACHE_STATE_SIZE(fcs) != fcs_size)
{
elog(ERROR, "LFC: Invalid file cache state size: %u vs. %u", (unsigned)FILE_CACHE_STATE_SIZE(fcs), VARSIZE(fcs));
}
fcs_chunk_size_log = fcs->chunk_size_log;
if (fcs_chunk_size_log > MAX_BLOCKS_PER_CHUNK_LOG)
{
elog(ERROR, "LFC: Invalid chunk size log: %u", fcs->chunk_size_log);
}
n_entries = Min(fcs->n_chunks, lfc_prewarm_limit);
Assert(n_entries != 0);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
/* Do not prewarm more entries than LFC limit */
if (lfc_ctl->limit <= lfc_ctl->size)
{
elog(LOG, "LFC: skip prewarm because LFC is already filled");
LWLockRelease(lfc_lock);
return;
}
if (lfc_ctl->prewarm_active)
{
LWLockRelease(lfc_lock);
elog(ERROR, "LFC: skip prewarm because another prewarm is still active");
}
lfc_ctl->n_prewarm_entries = n_entries;
lfc_ctl->n_prewarm_workers = n_workers;
lfc_ctl->prewarm_active = true;
lfc_ctl->prewarm_canceled = false;
lfc_ctl->prewarm_batch = prewarm_batch;
memset(lfc_ctl->prewarm_workers, 0, n_workers*sizeof(PrewarmWorkerState));
LWLockRelease(lfc_lock);
/* Calculate total number of pages to be prewarmed */
lfc_ctl->total_prewarm_pages = fcs->n_pages;
seg = dsm_create(fcs_size, 0);
memcpy(dsm_segment_address(seg), fcs, fcs_size);
lfc_ctl->prewarm_lfc_state_handle = dsm_segment_handle(seg);
/* Spawn background workers */
for (uint32 i = 0; i < n_workers; i++)
{
BackgroundWorker worker = {0};
worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
worker.bgw_start_time = BgWorkerStart_ConsistentState;
worker.bgw_restart_time = BGW_NEVER_RESTART;
strcpy(worker.bgw_library_name, "neon");
strcpy(worker.bgw_function_name, "lfc_prewarm_main");
snprintf(worker.bgw_name, BGW_MAXLEN, "LFC prewarm worker %d", i+1);
strcpy(worker.bgw_type, "LFC prewarm worker");
worker.bgw_main_arg = Int32GetDatum(i);
/* must set notify PID to wait for shutdown */
worker.bgw_notify_pid = MyProcPid;
if (!RegisterDynamicBackgroundWorker(&worker, &bgw_handle[i]))
{
ereport(LOG,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("LFC: registering dynamic bgworker prewarm failed"),
errhint("Consider increasing the configuration parameter \"%s\".", "max_worker_processes")));
n_workers = i;
lfc_ctl->prewarm_canceled = true;
break;
}
}
for (uint32 i = 0; i < n_workers; i++)
{
while (true)
{
PG_TRY();
{
BgwHandleStatus status = WaitForBackgroundWorkerShutdown(bgw_handle[i]);
if (status != BGWH_STOPPED && status != BGWH_POSTMASTER_DIED)
{
elog(LOG, "LFC: Unexpected status of prewarm worker termination: %d", status);
}
break;
}
PG_CATCH();
{
elog(LOG, "LFC: cancel prewarm");
lfc_ctl->prewarm_canceled = true;
}
PG_END_TRY();
}
if (!lfc_ctl->prewarm_workers[i].completed)
{
/* Background worker doesn't set completion time: it means that it was abnormally terminated */
elog(LOG, "LFC: prewarm worker %d failed", i+1);
/* Set completion time to prevent get_prewarm_info from considering this worker as active */
lfc_ctl->prewarm_workers[i].completed = GetCurrentTimestamp();
}
}
dsm_detach(seg);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
lfc_ctl->prewarm_active = false;
LWLockRelease(lfc_lock);
}
void
lfc_prewarm_main(Datum main_arg)
{
size_t snd_idx = 0, rcv_idx = 0;
size_t n_sent = 0, n_received = 0;
size_t fcs_chunk_size_log;
size_t max_prefetch_pages;
size_t prewarm_batch;
size_t n_workers;
dsm_segment *seg;
FileCacheState* fcs;
uint8* bitmap;
BufferTag tag;
PrewarmWorkerState* ws;
uint32 worker_id = DatumGetInt32(main_arg);
pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();
seg = dsm_attach(lfc_ctl->prewarm_lfc_state_handle);
if (seg == NULL)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not map dynamic shared memory segment")));
fcs = (FileCacheState*) dsm_segment_address(seg);
prewarm_batch = lfc_ctl->prewarm_batch;
fcs_chunk_size_log = fcs->chunk_size_log;
n_workers = lfc_ctl->n_prewarm_workers;
max_prefetch_pages = lfc_ctl->n_prewarm_entries << fcs_chunk_size_log;
ws = &lfc_ctl->prewarm_workers[worker_id];
bitmap = FILE_CACHE_STATE_BITMAP(fcs);
/* enable prefetch in LFC */
lfc_store_prefetch_result = true;
lfc_do_prewarm = true; /* Flag for lfc_prefetch preventing replacement of existed entries if LFC cache is full */
elog(LOG, "LFC: worker %d start prewarming", worker_id);
while (!lfc_ctl->prewarm_canceled)
{
if (snd_idx < max_prefetch_pages)
{
if ((snd_idx >> fcs_chunk_size_log) % n_workers != worker_id)
{
/* If there are multiple workers, split chunks between them */
snd_idx += 1 << fcs_chunk_size_log;
}
else
{
if (BITMAP_ISSET(bitmap, snd_idx))
{
tag = fcs->chunks[snd_idx >> fcs_chunk_size_log];
tag.blockNum += snd_idx & ((1 << fcs_chunk_size_log) - 1);
if (!lfc_cache_contains(BufTagGetNRelFileInfo(tag), tag.forkNum, tag.blockNum))
{
(void)communicator_prefetch_register_bufferv(tag, NULL, 1, NULL);
n_sent += 1;
}
else
{
ws->skipped_pages += 1;
BITMAP_CLR(bitmap, snd_idx);
}
}
snd_idx += 1;
}
}
if (n_sent >= n_received + prewarm_batch || snd_idx == max_prefetch_pages)
{
if (n_received == n_sent && snd_idx == max_prefetch_pages)
{
break;
}
if ((rcv_idx >> fcs_chunk_size_log) % n_workers != worker_id)
{
/* Skip chunks processed by other workers */
rcv_idx += 1 << fcs_chunk_size_log;
continue;
}
/* Locate next block to prefetch */
while (!BITMAP_ISSET(bitmap, rcv_idx))
{
rcv_idx += 1;
}
tag = fcs->chunks[rcv_idx >> fcs_chunk_size_log];
tag.blockNum += rcv_idx & ((1 << fcs_chunk_size_log) - 1);
if (communicator_prefetch_receive(tag))
{
ws->prewarmed_pages += 1;
}
else
{
ws->skipped_pages += 1;
}
rcv_idx += 1;
n_received += 1;
}
}
/* No need to perform prefetch cleanup here because prewarm worker will be terminated and
* connection to PS dropped just after return from this function.
*/
Assert(n_sent == n_received || lfc_ctl->prewarm_canceled);
elog(LOG, "LFC: worker %d complete prewarming: loaded %ld pages", worker_id, (long)n_received);
lfc_ctl->prewarm_workers[worker_id].completed = GetCurrentTimestamp();
}
/*
* Check if page is present in the cache.
* Returns true if page is found in local cache.
@@ -1372,11 +1001,8 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash)
* If we can't (e.g. because all other slots are being accessed)
* then we will remove this entry from the hash and continue
* on to the next chunk, as we may not exceed the limit.
*
* While prewarming LFC we do not want to replace existed entries,
* so we just stop prewarm is LFC cache is full.
*/
else if (!dlist_is_empty(&lfc_ctl->lru) && !lfc_do_prewarm)
else if (!dlist_is_empty(&lfc_ctl->lru))
{
/* Cache overflow: evict least recently used chunk */
FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node,
@@ -1400,7 +1026,6 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash)
/* Can't add this chunk - we don't have the space for it */
hash_search_with_hash_value(lfc_hash, &entry->key, hash,
HASH_REMOVE, NULL);
lfc_ctl->prewarm_canceled = true; /* cancel prewarm if LFC limit is reached */
return false;
}
@@ -1487,11 +1112,9 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
if (lfc_prewarm_update_ws_estimation)
{
tag.blockNum = blkno;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
}
tag.blockNum = blkno;
addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag)));
if (found)
{
state = GET_STATE(entry, chunk_offs);

View File

@@ -13,17 +13,6 @@
#include "neon_pgversioncompat.h"
typedef struct FileCacheState
{
int32 vl_len_; /* varlena header (do not touch directly!) */
uint32 magic;
uint32 n_chunks;
uint32 n_pages;
uint16 chunk_size_log;
BufferTag chunks[FLEXIBLE_ARRAY_MEMBER];
/* followed by bitmap */
} FileCacheState;
/* GUCs */
extern bool lfc_store_prefetch_result;
@@ -43,10 +32,7 @@ extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
extern void lfc_init(void);
extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
const void* buffer, XLogRecPtr lsn);
extern FileCacheState* lfc_get_state(size_t max_entries);
extern void lfc_prewarm(FileCacheState* fcs, uint32 n_workers);
PGDLLEXPORT void lfc_prewarm_main(Datum main_arg);
static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,

View File

@@ -3659,7 +3659,7 @@ impl Service {
locations: ShardMutationLocations,
http_client: reqwest::Client,
jwt: Option<String>,
create_req: TimelineCreateRequest,
mut create_req: TimelineCreateRequest,
) -> Result<TimelineInfo, ApiError> {
let latest = locations.latest.node;
@@ -3678,6 +3678,15 @@ impl Service {
.await
.map_err(|e| passthrough_api_error(&latest, e))?;
// If we are going to create the timeline on some stale locations for shard 0, then ask them to re-use
// the initdb generated by the latest location, rather than generating their own. This avoids racing uploads
// of initdb to S3 which might not be binary-identical if different pageservers have different postgres binaries.
if tenant_shard_id.is_shard_zero() {
if let models::TimelineCreateRequestMode::Bootstrap { existing_initdb_timeline_id, .. } = &mut create_req.mode {
*existing_initdb_timeline_id = Some(create_req.new_timeline_id);
}
}
// We propagate timeline creations to all attached locations such that a compute
// for the new timeline is able to start regardless of the current state of the
// tenant shard reconciliation.

View File

@@ -1194,7 +1194,8 @@ class NeonEnv:
else:
cfg["broker"]["listen_addr"] = self.broker.listen_addr()
cfg["control_plane_api"] = self.control_plane_api
if self.control_plane_api is not None:
cfg["control_plane_api"] = self.control_plane_api
if self.control_plane_hooks_api is not None:
cfg["control_plane_hooks_api"] = self.control_plane_hooks_api

View File

@@ -14,7 +14,7 @@ from fixtures.log_helper import log
from fixtures.metrics import parse_metrics
from fixtures.paths import BASE_DIR
from fixtures.pg_config import PgConfigKey
from fixtures.utils import WITH_SANITIZERS, subprocess_capture
from fixtures.utils import subprocess_capture
from werkzeug.wrappers.response import Response
if TYPE_CHECKING:
@@ -148,15 +148,6 @@ def test_remote_extensions(
pg_config: PgConfig,
extension: RemoteExtension,
):
if WITH_SANITIZERS and extension is RemoteExtension.WITH_LIB:
pytest.skip(
"""
For this test to work with sanitizers enabled, we would need to
compile the dummy Postgres extension with the same CFLAGS that we
compile Postgres and the neon extension with to link the sanitizers.
"""
)
# Setup a mock nginx S3 gateway which will return our test extension.
(host, port) = httpserver_listen_address
extensions_endpoint = f"http://{host}:{port}/pg-ext-s3-gateway"

View File

@@ -3,7 +3,7 @@
Tests in this module exercise the pageserver's behavior around generation numbers,
as defined in docs/rfcs/025-generation-numbers.md. Briefly, the behaviors we require
of the pageserver are:
- Do not start a tenant without a generation number
- Do not start a tenant without a generation number if control_plane_api is set
- Remote objects must be suffixed with generation
- Deletions may only be executed after validating generation
- Updates to remote_consistent_lsn may only be made visible after validating generation