Fix race condition leading to panic in remote storage sync thread.

The SyncQueue consisted of a tokio mpsc channel, and an atomic counter
to keep track of how many items there are in the channel. Updating the
atomic counter was racy, and sometimes the consumer would decrement
the counter before the producer had incremented it, leading to integer
wraparound to usize::MAX. Calling Vec::with_capacity(usize::MAX) leads
to a panic.

To fix, replace the channel with a VecDeque protected by a Mutex, and
a condition variable for signaling. Now that the queue is now
protected by standard blocking Mutex and Condvar, refactor the
functions touching it to be sync, not async.

A theoretical downside of this is that the calls to push items to the
queue and the storage sync thread that drains the queue might now need
to wait, if another thread is busy manipulating the queue. I believe
that's OK; the lock isn't held for very long, and these operations are
made in background threads, not in the hot GetPage@LSN path, so
they're not very latency-sensitive.

Fixes #1719. Also add a test case.
This commit is contained in:
Heikki Linnakangas
2022-05-17 18:14:37 +03:00
parent f03779bf1a
commit 55ea3f262e
5 changed files with 208 additions and 141 deletions

View File

@@ -9,7 +9,7 @@
//!
//! * public API via to interact with the external world:
//! * [`start_local_timeline_sync`] to launch a background async loop to handle the synchronization
//! * [`schedule_timeline_checkpoint_upload`] and [`schedule_timeline_download`] to enqueue a new upload and download tasks,
//! * [`schedule_layer_upload`], [`schedule_layer_download`], and[`schedule_layer_delete`] to enqueue a new task
//! to be processed by the async loop
//!
//! Here's a schematic overview of all interactions backup and the rest of the pageserver perform:
@@ -44,8 +44,8 @@
//! query their downloads later if they are accessed.
//!
//! Some time later, during pageserver checkpoints, in-memory data is flushed onto disk along with its metadata.
//! If the storage sync loop was successfully started before, pageserver schedules the new checkpoint file uploads after every checkpoint.
//! The checkpoint uploads are disabled, if no remote storage configuration is provided (no sync loop is started this way either).
//! If the storage sync loop was successfully started before, pageserver schedules the layer files and the updated metadata file for upload, every time a layer is flushed to disk.
//! The uploads are disabled, if no remote storage configuration is provided (no sync loop is started this way either).
//! See [`crate::layered_repository`] for the upload calls and the adjacent logic.
//!
//! Synchronization logic is able to communicate back with updated timeline sync states, [`crate::repository::TimelineSyncStatusUpdate`],
@@ -54,7 +54,7 @@
//! * once after the sync loop startup, to signal pageserver which timelines will be synchronized in the near future
//! * after every loop step, in case a timeline needs to be reloaded or evicted from pageserver's memory
//!
//! When the pageserver terminates, the sync loop finishes a current sync task (if any) and exits.
//! When the pageserver terminates, the sync loop finishes current sync task (if any) and exits.
//!
//! The storage logic considers `image` as a set of local files (layers), fully representing a certain timeline at given moment (identified with `disk_consistent_lsn` from the corresponding `metadata` file).
//! Timeline can change its state, by adding more files on disk and advancing its `disk_consistent_lsn`: this happens after pageserver checkpointing and is followed
@@ -66,13 +66,13 @@
//! when the newer image is downloaded
//!
//! Pageserver maintains similar to the local file structure remotely: all layer files are uploaded with the same names under the same directory structure.
//! Yet instead of keeping the `metadata` file remotely, we wrap it with more data in [`IndexShard`], containing the list of remote files.
//! Yet instead of keeping the `metadata` file remotely, we wrap it with more data in [`IndexPart`], containing the list of remote files.
//! This file gets read to populate the cache, if the remote timeline data is missing from it and gets updated after every successful download.
//! This way, we optimize S3 storage access by not running the `S3 list` command that could be expencive and slow: knowing both [`ZTenantId`] and [`ZTimelineId`],
//! we can always reconstruct the path to the timeline, use this to get the same path on the remote storage and retrive its shard contents, if needed, same as any layer files.
//!
//! By default, pageserver reads the remote storage index data only for timelines located locally, to synchronize those, if needed.
//! Bulk index data download happens only initially, on pageserer startup. The rest of the remote storage stays unknown to pageserver and loaded on demand only,
//! Bulk index data download happens only initially, on pageserver startup. The rest of the remote storage stays unknown to pageserver and loaded on demand only,
//! when a new timeline is scheduled for the download.
//!
//! NOTES:
@@ -89,13 +89,12 @@
//! Synchronization is done with the queue being emptied via separate thread asynchronously,
//! attempting to fully store pageserver's local data on the remote storage in a custom format, beneficial for storing.
//!
//! A queue is implemented in the [`sync_queue`] module as a pair of sender and receiver channels, to block on zero tasks instead of checking the queue.
//! The pair's shared buffer of a fixed size serves as an implicit queue, holding [`SyncTask`] for local files upload/download operations.
//! A queue is implemented in the [`sync_queue`] module as a VecDeque to hold the tasks, and a condition variable for blocking when the queue is empty.
//!
//! The queue gets emptied by a single thread with the loop, that polls the tasks in batches of deduplicated tasks.
//! A task from the batch corresponds to a single timeline, with its files to sync merged together: given that only one task sync loop step is active at a time,
//! timeline uploads and downloads can happen concurrently, in no particular order due to incremental nature of the timeline layers.
//! Deletion happens only after a successful upload only, otherwise the compation output might make the timeline inconsistent until both tasks are fully processed without errors.
//! Deletion happens only after a successful upload only, otherwise the compaction output might make the timeline inconsistent until both tasks are fully processed without errors.
//! Upload and download update the remote data (inmemory index and S3 json index part file) only after every layer is successfully synchronized, while the deletion task
//! does otherwise: it requires to have the remote data updated first succesfully: blob files will be invisible to pageserver this way.
//!
@@ -138,8 +137,6 @@
//! NOTE: No real contents or checksum check happens right now and is a subject to improve later.
//!
//! After the whole timeline is downloaded, [`crate::tenant_mgr::apply_timeline_sync_status_updates`] function is used to update pageserver memory stage for the timeline processed.
//!
//! When pageserver signals shutdown, current sync task gets finished and the loop exists.
mod delete;
mod download;
@@ -153,10 +150,7 @@ use std::{
num::{NonZeroU32, NonZeroUsize},
ops::ControlFlow,
path::{Path, PathBuf},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
sync::{Arc, Condvar, Mutex},
};
use anyhow::{anyhow, bail, Context};
@@ -167,7 +161,6 @@ use remote_storage::{GenericRemoteStorage, RemoteStorage};
use tokio::{
fs,
runtime::Runtime,
sync::mpsc::{self, error::TryRecvError, UnboundedReceiver, UnboundedSender},
time::{Duration, Instant},
};
use tracing::*;
@@ -453,97 +446,77 @@ fn collect_timeline_files(
Ok((timeline_id, metadata, timeline_files))
}
/// Wraps mpsc channel bits around into a queue interface.
/// mpsc approach was picked to allow blocking the sync loop if no tasks are present, to avoid meaningless spinning.
/// Global queue of sync tasks.
///
/// 'queue' is protected by a mutex, and 'condvar' is used to wait for tasks to arrive.
struct SyncQueue {
len: AtomicUsize,
max_timelines_per_batch: NonZeroUsize,
sender: UnboundedSender<(ZTenantTimelineId, SyncTask)>,
queue: Mutex<VecDeque<(ZTenantTimelineId, SyncTask)>>,
condvar: Condvar,
}
impl SyncQueue {
fn new(
max_timelines_per_batch: NonZeroUsize,
) -> (Self, UnboundedReceiver<(ZTenantTimelineId, SyncTask)>) {
let (sender, receiver) = mpsc::unbounded_channel();
(
Self {
len: AtomicUsize::new(0),
max_timelines_per_batch,
sender,
},
receiver,
)
fn new(max_timelines_per_batch: NonZeroUsize) -> Self {
Self {
max_timelines_per_batch,
queue: Mutex::new(VecDeque::new()),
condvar: Condvar::new(),
}
}
/// Queue a new task
fn push(&self, sync_id: ZTenantTimelineId, new_task: SyncTask) {
match self.sender.send((sync_id, new_task)) {
Ok(()) => {
self.len.fetch_add(1, Ordering::Relaxed);
}
Err(e) => {
error!("failed to push sync task to queue: {e}");
}
let mut q = self.queue.lock().unwrap();
q.push_back((sync_id, new_task));
if q.len() <= 1 {
self.condvar.notify_one();
}
}
/// Fetches a task batch, getting every existing entry from the queue, grouping by timelines and merging the tasks for every timeline.
/// A timeline has to care to not to delete cetain layers from the remote storage before the corresponding uploads happen.
/// Otherwise, due to "immutable" nature of the layers, the order of their deletion/uploading/downloading does not matter.
/// A timeline has to care to not to delete certain layers from the remote storage before the corresponding uploads happen.
/// Other than that, due to "immutable" nature of the layers, the order of their deletion/uploading/downloading does not matter.
/// Hence, we merge the layers together into single task per timeline and run those concurrently (with the deletion happening only after successful uploading).
async fn next_task_batch(
&self,
// The queue is based on two ends of a channel and has to be accessible statically without blocking for submissions from the sync code.
// Its receiver needs &mut, so we cannot place it in the same container with the other end and get both static and non-blocking access.
// Hence toss this around to use it from the sync loop directly as &mut.
sync_queue_receiver: &mut UnboundedReceiver<(ZTenantTimelineId, SyncTask)>,
) -> HashMap<ZTenantTimelineId, SyncTaskBatch> {
// request the first task in blocking fashion to do less meaningless work
let (first_sync_id, first_task) = if let Some(first_task) = sync_queue_receiver.recv().await
{
self.len.fetch_sub(1, Ordering::Relaxed);
first_task
} else {
info!("Queue sender part was dropped, aborting");
return HashMap::new();
};
fn next_task_batch(&self) -> (HashMap<ZTenantTimelineId, SyncTaskBatch>, usize) {
// Wait for the first task in blocking fashion
let mut q = self.queue.lock().unwrap();
while q.is_empty() {
q = self
.condvar
.wait_timeout(q, Duration::from_millis(1000))
.unwrap()
.0;
if thread_mgr::is_shutdown_requested() {
return (HashMap::new(), q.len());
}
}
let (first_sync_id, first_task) = q.pop_front().unwrap();
let mut timelines_left_to_batch = self.max_timelines_per_batch.get() - 1;
let mut tasks_to_process = self.len();
let tasks_to_process = q.len();
let mut batches = HashMap::with_capacity(tasks_to_process);
batches.insert(first_sync_id, SyncTaskBatch::new(first_task));
let mut tasks_to_reenqueue = Vec::with_capacity(tasks_to_process);
// Pull the queue channel until we get all tasks that were there at the beginning of the batch construction.
// Greedily grab as many other tasks that we can.
// Yet do not put all timelines in the batch, but only the first ones that fit the timeline limit.
// Still merge the rest of the pulled tasks and reenqueue those for later.
while tasks_to_process > 0 {
match sync_queue_receiver.try_recv() {
Ok((sync_id, new_task)) => {
self.len.fetch_sub(1, Ordering::Relaxed);
tasks_to_process -= 1;
match batches.entry(sync_id) {
hash_map::Entry::Occupied(mut v) => v.get_mut().add(new_task),
hash_map::Entry::Vacant(v) => {
timelines_left_to_batch = timelines_left_to_batch.saturating_sub(1);
if timelines_left_to_batch == 0 {
tasks_to_reenqueue.push((sync_id, new_task));
} else {
v.insert(SyncTaskBatch::new(new_task));
}
}
// Re-enqueue the tasks that don't fit in this batch.
while let Some((sync_id, new_task)) = q.pop_front() {
match batches.entry(sync_id) {
hash_map::Entry::Occupied(mut v) => v.get_mut().add(new_task),
hash_map::Entry::Vacant(v) => {
timelines_left_to_batch = timelines_left_to_batch.saturating_sub(1);
if timelines_left_to_batch == 0 {
tasks_to_reenqueue.push((sync_id, new_task));
} else {
v.insert(SyncTaskBatch::new(new_task));
}
}
Err(TryRecvError::Disconnected) => {
debug!("Sender disconnected, batch collection aborted");
break;
}
Err(TryRecvError::Empty) => {
debug!("No more data in the sync queue, task batch is not full");
break;
}
}
}
@@ -553,14 +526,15 @@ impl SyncQueue {
tasks_to_reenqueue.len()
);
for (id, task) in tasks_to_reenqueue {
self.push(id, task);
q.push_back((id, task));
}
batches
(batches, q.len())
}
#[cfg(test)]
fn len(&self) -> usize {
self.len.load(Ordering::Relaxed)
self.queue.lock().unwrap().len()
}
}
@@ -823,7 +797,7 @@ pub fn schedule_layer_download(tenant_id: ZTenantId, timeline_id: ZTimelineId) {
debug!("Download task for tenant {tenant_id}, timeline {timeline_id} sent")
}
/// Uses a remote storage given to start the storage sync loop.
/// Launch a thread to perform remote storage sync tasks.
/// See module docs for loop step description.
pub(super) fn spawn_storage_sync_thread<P, S>(
conf: &'static PageServerConf,
@@ -836,7 +810,7 @@ where
P: Debug + Send + Sync + 'static,
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
{
let (sync_queue, sync_queue_receiver) = SyncQueue::new(max_concurrent_timelines_sync);
let sync_queue = SyncQueue::new(max_concurrent_timelines_sync);
SYNC_QUEUE
.set(sync_queue)
.map_err(|_queue| anyhow!("Could not initialize sync queue"))?;
@@ -864,7 +838,7 @@ where
local_timeline_files,
);
let loop_index = remote_index.clone();
let remote_index_clone = remote_index.clone();
thread_mgr::spawn(
ThreadKind::StorageSync,
None,
@@ -875,12 +849,7 @@ where
storage_sync_loop(
runtime,
conf,
(
Arc::new(storage),
loop_index,
sync_queue,
sync_queue_receiver,
),
(Arc::new(storage), remote_index_clone, sync_queue),
max_sync_errors,
);
Ok(())
@@ -896,12 +865,7 @@ where
fn storage_sync_loop<P, S>(
runtime: Runtime,
conf: &'static PageServerConf,
(storage, index, sync_queue, mut sync_queue_receiver): (
Arc<S>,
RemoteIndex,
&SyncQueue,
UnboundedReceiver<(ZTenantTimelineId, SyncTask)>,
),
(storage, index, sync_queue): (Arc<S>, RemoteIndex, &SyncQueue),
max_sync_errors: NonZeroU32,
) where
P: Debug + Send + Sync + 'static,
@@ -909,16 +873,35 @@ fn storage_sync_loop<P, S>(
{
info!("Starting remote storage sync loop");
loop {
let loop_index = index.clone();
let loop_storage = Arc::clone(&storage);
let (batched_tasks, remaining_queue_length) = sync_queue.next_task_batch();
if thread_mgr::is_shutdown_requested() {
info!("Shutdown requested, stopping");
break;
}
REMAINING_SYNC_ITEMS.set(remaining_queue_length as i64);
if remaining_queue_length > 0 || !batched_tasks.is_empty() {
info!("Processing tasks for {} timelines in batch, more tasks left to process: {remaining_queue_length}", batched_tasks.len());
} else {
debug!("No tasks to process");
continue;
}
// Concurrently perform all the tasks in the batch
let loop_step = runtime.block_on(async {
tokio::select! {
step = loop_step(
step = process_batches(
conf,
(loop_storage, loop_index, sync_queue, &mut sync_queue_receiver),
max_sync_errors,
loop_storage,
&index,
batched_tasks,
sync_queue,
)
.instrument(info_span!("storage_sync_loop_step")) => step,
.instrument(info_span!("storage_sync_loop_step")) => ControlFlow::Continue(step),
_ = thread_mgr::shutdown_watcher() => ControlFlow::Break(()),
}
});
@@ -944,31 +927,18 @@ fn storage_sync_loop<P, S>(
}
}
async fn loop_step<P, S>(
async fn process_batches<P, S>(
conf: &'static PageServerConf,
(storage, index, sync_queue, sync_queue_receiver): (
Arc<S>,
RemoteIndex,
&SyncQueue,
&mut UnboundedReceiver<(ZTenantTimelineId, SyncTask)>,
),
max_sync_errors: NonZeroU32,
) -> ControlFlow<(), HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncStatusUpdate>>>
storage: Arc<S>,
index: &RemoteIndex,
batched_tasks: HashMap<ZTenantTimelineId, SyncTaskBatch>,
sync_queue: &SyncQueue,
) -> HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncStatusUpdate>>
where
P: Debug + Send + Sync + 'static,
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
{
let batched_tasks = sync_queue.next_task_batch(sync_queue_receiver).await;
let remaining_queue_length = sync_queue.len();
REMAINING_SYNC_ITEMS.set(remaining_queue_length as i64);
if remaining_queue_length > 0 || !batched_tasks.is_empty() {
info!("Processing tasks for {} timelines in batch, more tasks left to process: {remaining_queue_length}", batched_tasks.len());
} else {
debug!("No tasks to process");
return ControlFlow::Continue(HashMap::new());
}
let mut sync_results = batched_tasks
.into_iter()
.map(|(sync_id, batch)| {
@@ -993,6 +963,7 @@ where
ZTenantId,
HashMap<ZTimelineId, TimelineSyncStatusUpdate>,
> = HashMap::new();
while let Some((sync_id, state_update)) = sync_results.next().await {
debug!("Finished storage sync task for sync id {sync_id}");
if let Some(state_update) = state_update {
@@ -1003,7 +974,7 @@ where
}
}
ControlFlow::Continue(new_timeline_states)
new_timeline_states
}
async fn process_sync_task_batch<P, S>(
@@ -1376,7 +1347,6 @@ where
P: Debug + Send + Sync + 'static,
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
{
info!("Updating remote index for the timeline");
let updated_remote_timeline = {
let mut index_accessor = index.write().await;
@@ -1443,7 +1413,7 @@ where
IndexPart::from_remote_timeline(&timeline_path, updated_remote_timeline)
.context("Failed to create an index part from the updated remote timeline")?;
info!("Uploading remote data for the timeline");
info!("Uploading remote index for the timeline");
upload_index_part(conf, storage, sync_id, new_index_part)
.await
.context("Failed to upload new index part")
@@ -1685,7 +1655,7 @@ mod tests {
#[tokio::test]
async fn separate_task_ids_batch() {
let (sync_queue, mut sync_queue_receiver) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
assert_eq!(sync_queue.len(), 0);
let sync_id_2 = ZTenantTimelineId {
@@ -1720,7 +1690,7 @@ mod tests {
let submitted_tasks_count = sync_queue.len();
assert_eq!(submitted_tasks_count, 3);
let mut batch = sync_queue.next_task_batch(&mut sync_queue_receiver).await;
let (mut batch, _) = sync_queue.next_task_batch();
assert_eq!(
batch.len(),
submitted_tasks_count,
@@ -1746,7 +1716,7 @@ mod tests {
#[tokio::test]
async fn same_task_id_separate_tasks_batch() {
let (sync_queue, mut sync_queue_receiver) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
assert_eq!(sync_queue.len(), 0);
let download = LayersDownload {
@@ -1769,7 +1739,7 @@ mod tests {
let submitted_tasks_count = sync_queue.len();
assert_eq!(submitted_tasks_count, 3);
let mut batch = sync_queue.next_task_batch(&mut sync_queue_receiver).await;
let (mut batch, _) = sync_queue.next_task_batch();
assert_eq!(
batch.len(),
1,
@@ -1801,7 +1771,7 @@ mod tests {
#[tokio::test]
async fn same_task_id_same_tasks_batch() {
let (sync_queue, mut sync_queue_receiver) = SyncQueue::new(NonZeroUsize::new(1).unwrap());
let sync_queue = SyncQueue::new(NonZeroUsize::new(1).unwrap());
let download_1 = LayersDownload {
layers_to_skip: HashSet::from([PathBuf::from("sk1")]),
};
@@ -1823,11 +1793,11 @@ mod tests {
sync_queue.push(TEST_SYNC_ID, SyncTask::download(download_1.clone()));
sync_queue.push(TEST_SYNC_ID, SyncTask::download(download_2.clone()));
sync_queue.push(sync_id_2, SyncTask::download(download_3.clone()));
sync_queue.push(sync_id_2, SyncTask::download(download_3));
sync_queue.push(TEST_SYNC_ID, SyncTask::download(download_4.clone()));
assert_eq!(sync_queue.len(), 4);
let mut smallest_batch = sync_queue.next_task_batch(&mut sync_queue_receiver).await;
let (mut smallest_batch, _) = sync_queue.next_task_batch();
assert_eq!(
smallest_batch.len(),
1,

View File

@@ -119,7 +119,7 @@ mod tests {
#[tokio::test]
async fn delete_timeline_negative() -> anyhow::Result<()> {
let harness = RepoHarness::create("delete_timeline_negative")?;
let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
let storage = LocalFs::new(
tempdir()?.path().to_path_buf(),
@@ -152,7 +152,7 @@ mod tests {
#[tokio::test]
async fn delete_timeline() -> anyhow::Result<()> {
let harness = RepoHarness::create("delete_timeline")?;
let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
let layer_files = ["a", "b", "c", "d"];

View File

@@ -286,7 +286,7 @@ mod tests {
#[tokio::test]
async fn download_timeline() -> anyhow::Result<()> {
let harness = RepoHarness::create("download_timeline")?;
let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
let layer_files = ["a", "b", "layer_to_skip", "layer_to_keep_locally"];
@@ -385,7 +385,7 @@ mod tests {
#[tokio::test]
async fn download_timeline_negatives() -> anyhow::Result<()> {
let harness = RepoHarness::create("download_timeline_negatives")?;
let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
let storage = LocalFs::new(tempdir()?.path().to_owned(), harness.conf.workdir.clone())?;

View File

@@ -240,7 +240,7 @@ mod tests {
#[tokio::test]
async fn regular_layer_upload() -> anyhow::Result<()> {
let harness = RepoHarness::create("regular_layer_upload")?;
let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
let layer_files = ["a", "b"];
@@ -327,7 +327,7 @@ mod tests {
#[tokio::test]
async fn layer_upload_after_local_fs_update() -> anyhow::Result<()> {
let harness = RepoHarness::create("layer_upload_after_local_fs_update")?;
let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
let layer_files = ["a1", "b1"];

View File

@@ -0,0 +1,97 @@
#
# Little stress test for the checkpointing and remote storage code.
#
# The test creates several tenants, and runs a simple workload on
# each tenant, in parallel. The test uses remote storage, and a tiny
# checkpoint_distance setting so that a lot of layer files are created.
#
import asyncio
from contextlib import closing
from uuid import UUID
import pytest
from fixtures.zenith_fixtures import ZenithEnvBuilder, ZenithEnv, Postgres, wait_for_last_record_lsn, wait_for_upload
from fixtures.utils import lsn_from_hex
async def tenant_workload(env: ZenithEnv, pg: Postgres):
pageserver_conn = await env.pageserver.connect_async()
pg_conn = await pg.connect_async()
tenant_id = await pg_conn.fetchval("show zenith.zenith_tenant")
timeline_id = await pg_conn.fetchval("show zenith.zenith_timeline")
await pg_conn.execute("CREATE TABLE t(key int primary key, value text)")
for i in range(1, 100):
await pg_conn.execute(
f"INSERT INTO t SELECT {i}*1000 + g, 'payload' from generate_series(1,1000) g")
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
res = await pg_conn.fetchval("SELECT count(*) FROM t")
assert res == i * 1000
async def all_tenants_workload(env: ZenithEnv, tenants_pgs):
workers = []
for tenant, pg in tenants_pgs:
worker = tenant_workload(env, pg)
workers.append(asyncio.create_task(worker))
# await all workers
await asyncio.gather(*workers)
@pytest.mark.parametrize('storage_type', ['local_fs', 'mock_s3'])
def test_tenants_many(zenith_env_builder: ZenithEnvBuilder, storage_type: str):
if storage_type == 'local_fs':
zenith_env_builder.enable_local_fs_remote_storage()
elif storage_type == 'mock_s3':
zenith_env_builder.enable_s3_mock_remote_storage('test_remote_storage_backup_and_restore')
else:
raise RuntimeError(f'Unknown storage type: {storage_type}')
zenith_env_builder.enable_local_fs_remote_storage()
env = zenith_env_builder.init_start()
tenants_pgs = []
for i in range(1, 5):
# Use a tiny checkpoint distance, to create a lot of layers quickly
tenant, _ = env.zenith_cli.create_tenant(
conf={
'checkpoint_distance': '5000000',
})
env.zenith_cli.create_timeline(f'test_tenants_many', tenant_id=tenant)
pg = env.postgres.create_start(
f'test_tenants_many',
tenant_id=tenant,
)
tenants_pgs.append((tenant, pg))
asyncio.run(all_tenants_workload(env, tenants_pgs))
# Wait for the remote storage uploads to finish
pageserver_http = env.pageserver.http_client()
for tenant, pg in tenants_pgs:
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("show zenith.zenith_tenant")
tenant_id = cur.fetchone()[0]
cur.execute("show zenith.zenith_timeline")
timeline_id = cur.fetchone()[0]
cur.execute("SELECT pg_current_wal_flush_lsn()")
current_lsn = lsn_from_hex(cur.fetchone()[0])
# wait until pageserver receives all the data
wait_for_last_record_lsn(pageserver_http, UUID(tenant_id), UUID(timeline_id), current_lsn)
# run final checkpoint manually to flush all the data to remote storage
env.pageserver.safe_psql(f"checkpoint {tenant_id} {timeline_id}")
wait_for_upload(pageserver_http, UUID(tenant_id), UUID(timeline_id), current_lsn)