mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-24 16:40:38 +00:00
Compare commits
10 Commits
yuchen/ben
...
fix/tests_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
94f5757450 | ||
|
|
46668775eb | ||
|
|
0e0d8276da | ||
|
|
e82f7f0dfc | ||
|
|
8173dc600a | ||
|
|
da1daa2426 | ||
|
|
9e3cb75bc7 | ||
|
|
5c41707bee | ||
|
|
cc37fa0f33 | ||
|
|
23f5a27146 |
@@ -220,6 +220,11 @@ impl AzureBlobStorage {
|
||||
let started_at = ScopeGuard::into_inner(started_at);
|
||||
let outcome = match &download {
|
||||
Ok(_) => AttemptOutcome::Ok,
|
||||
// At this level in the stack 404 and 304 responses do not indicate an error.
|
||||
// There's expected cases when a blob may not exist or hasn't been modified since
|
||||
// the last get (e.g. probing for timeline indices and heatmap downloads).
|
||||
// Callers should handle errors if they are unexpected.
|
||||
Err(DownloadError::NotFound | DownloadError::Unmodified) => AttemptOutcome::Ok,
|
||||
Err(_) => AttemptOutcome::Err,
|
||||
};
|
||||
crate::metrics::BUCKET_METRICS
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
pub mod heavier_once_cell;
|
||||
|
||||
pub mod duplex;
|
||||
pub mod gate;
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
pub mod mpsc;
|
||||
@@ -1,36 +0,0 @@
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
/// A bi-directional channel.
|
||||
pub struct Duplex<S, R> {
|
||||
pub tx: mpsc::Sender<S>,
|
||||
pub rx: mpsc::Receiver<R>,
|
||||
}
|
||||
|
||||
/// Creates a bi-directional channel.
|
||||
///
|
||||
/// The channel will buffer up to the provided number of messages. Once the buffer is full,
|
||||
/// attempts to send new messages will wait until a message is received from the channel.
|
||||
/// The provided buffer capacity must be at least 1.
|
||||
pub fn channel<A: Send, B: Send>(buffer: usize) -> (Duplex<A, B>, Duplex<B, A>) {
|
||||
let (tx_a, rx_a) = mpsc::channel::<A>(buffer);
|
||||
let (tx_b, rx_b) = mpsc::channel::<B>(buffer);
|
||||
|
||||
(Duplex { tx: tx_a, rx: rx_b }, Duplex { tx: tx_b, rx: rx_a })
|
||||
}
|
||||
|
||||
impl<S: Send, R: Send> Duplex<S, R> {
|
||||
/// Sends a value, waiting until there is capacity.
|
||||
///
|
||||
/// A successful send occurs when it is determined that the other end of the channel has not hung up already.
|
||||
pub async fn send(&self, x: S) -> Result<(), mpsc::error::SendError<S>> {
|
||||
self.tx.send(x).await
|
||||
}
|
||||
|
||||
/// Receives the next value for this receiver.
|
||||
///
|
||||
/// This method returns `None` if the channel has been closed and there are
|
||||
/// no remaining messages in the channel's buffer.
|
||||
pub async fn recv(&mut self) -> Option<R> {
|
||||
self.rx.recv().await
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,7 @@
|
||||
use crate::models::*;
|
||||
use crate::serialized_batch::SerializedValueBatch;
|
||||
use bytes::{Buf, Bytes};
|
||||
use pageserver_api::key::rel_block_to_key;
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use postgres_ffi::pg_constants;
|
||||
@@ -32,7 +33,8 @@ impl InterpretedWalRecord {
|
||||
FlushUncommittedRecords::No
|
||||
};
|
||||
|
||||
let metadata_record = MetadataRecord::from_decoded(&decoded, next_record_lsn, pg_version)?;
|
||||
let metadata_record =
|
||||
MetadataRecord::from_decoded_filtered(&decoded, shard, next_record_lsn, pg_version)?;
|
||||
let batch = SerializedValueBatch::from_decoded_filtered(
|
||||
decoded,
|
||||
shard,
|
||||
@@ -51,8 +53,13 @@ impl InterpretedWalRecord {
|
||||
}
|
||||
|
||||
impl MetadataRecord {
|
||||
fn from_decoded(
|
||||
/// Builds a metadata record for this WAL record, if any.
|
||||
///
|
||||
/// Only metadata records relevant for the given shard are emitted. Currently, most metadata
|
||||
/// records are broadcast to all shards for simplicity, but this should be improved.
|
||||
fn from_decoded_filtered(
|
||||
decoded: &DecodedWALRecord,
|
||||
shard: &ShardIdentity,
|
||||
next_record_lsn: Lsn,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Option<MetadataRecord>> {
|
||||
@@ -61,26 +68,27 @@ impl MetadataRecord {
|
||||
let mut buf = decoded.record.clone();
|
||||
buf.advance(decoded.main_data_offset);
|
||||
|
||||
match decoded.xl_rmid {
|
||||
// First, generate metadata records from the decoded WAL record.
|
||||
let mut metadata_record = match decoded.xl_rmid {
|
||||
pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
|
||||
Self::decode_heapam_record(&mut buf, decoded, pg_version)
|
||||
Self::decode_heapam_record(&mut buf, decoded, pg_version)?
|
||||
}
|
||||
pg_constants::RM_NEON_ID => Self::decode_neonmgr_record(&mut buf, decoded, pg_version),
|
||||
pg_constants::RM_NEON_ID => Self::decode_neonmgr_record(&mut buf, decoded, pg_version)?,
|
||||
// Handle other special record types
|
||||
pg_constants::RM_SMGR_ID => Self::decode_smgr_record(&mut buf, decoded),
|
||||
pg_constants::RM_DBASE_ID => Self::decode_dbase_record(&mut buf, decoded, pg_version),
|
||||
pg_constants::RM_SMGR_ID => Self::decode_smgr_record(&mut buf, decoded)?,
|
||||
pg_constants::RM_DBASE_ID => Self::decode_dbase_record(&mut buf, decoded, pg_version)?,
|
||||
pg_constants::RM_TBLSPC_ID => {
|
||||
tracing::trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet");
|
||||
Ok(None)
|
||||
None
|
||||
}
|
||||
pg_constants::RM_CLOG_ID => Self::decode_clog_record(&mut buf, decoded, pg_version),
|
||||
pg_constants::RM_CLOG_ID => Self::decode_clog_record(&mut buf, decoded, pg_version)?,
|
||||
pg_constants::RM_XACT_ID => {
|
||||
Self::decode_xact_record(&mut buf, decoded, next_record_lsn)
|
||||
Self::decode_xact_record(&mut buf, decoded, next_record_lsn)?
|
||||
}
|
||||
pg_constants::RM_MULTIXACT_ID => {
|
||||
Self::decode_multixact_record(&mut buf, decoded, pg_version)
|
||||
Self::decode_multixact_record(&mut buf, decoded, pg_version)?
|
||||
}
|
||||
pg_constants::RM_RELMAP_ID => Self::decode_relmap_record(&mut buf, decoded),
|
||||
pg_constants::RM_RELMAP_ID => Self::decode_relmap_record(&mut buf, decoded)?,
|
||||
// This is an odd duck. It needs to go to all shards.
|
||||
// Since it uses the checkpoint image (that's initialized from CHECKPOINT_KEY
|
||||
// in WalIngest::new), we have to send the whole DecodedWalRecord::record to
|
||||
@@ -89,19 +97,48 @@ impl MetadataRecord {
|
||||
// Alternatively, one can make the checkpoint part of the subscription protocol
|
||||
// to the pageserver. This should work fine, but can be done at a later point.
|
||||
pg_constants::RM_XLOG_ID => {
|
||||
Self::decode_xlog_record(&mut buf, decoded, next_record_lsn)
|
||||
Self::decode_xlog_record(&mut buf, decoded, next_record_lsn)?
|
||||
}
|
||||
pg_constants::RM_LOGICALMSG_ID => {
|
||||
Self::decode_logical_message_record(&mut buf, decoded)
|
||||
Self::decode_logical_message_record(&mut buf, decoded)?
|
||||
}
|
||||
pg_constants::RM_STANDBY_ID => Self::decode_standby_record(&mut buf, decoded),
|
||||
pg_constants::RM_REPLORIGIN_ID => Self::decode_replorigin_record(&mut buf, decoded),
|
||||
pg_constants::RM_STANDBY_ID => Self::decode_standby_record(&mut buf, decoded)?,
|
||||
pg_constants::RM_REPLORIGIN_ID => Self::decode_replorigin_record(&mut buf, decoded)?,
|
||||
_unexpected => {
|
||||
// TODO: consider failing here instead of blindly doing something without
|
||||
// understanding the protocol
|
||||
Ok(None)
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
// Next, filter the metadata record by shard.
|
||||
|
||||
// Route VM page updates to the shards that own them. VM pages are stored in the VM fork
|
||||
// of the main relation. These are sharded and managed just like regular relation pages.
|
||||
// See: https://github.com/neondatabase/neon/issues/9855
|
||||
if let Some(
|
||||
MetadataRecord::Heapam(HeapamRecord::ClearVmBits(ref mut clear_vm_bits))
|
||||
| MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(ref mut clear_vm_bits)),
|
||||
) = metadata_record
|
||||
{
|
||||
let is_local_vm_page = |heap_blk| {
|
||||
let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blk);
|
||||
shard.is_key_local(&rel_block_to_key(clear_vm_bits.vm_rel, vm_blk))
|
||||
};
|
||||
// Send the old and new VM page updates to their respective shards.
|
||||
clear_vm_bits.old_heap_blkno = clear_vm_bits
|
||||
.old_heap_blkno
|
||||
.filter(|&blkno| is_local_vm_page(blkno));
|
||||
clear_vm_bits.new_heap_blkno = clear_vm_bits
|
||||
.new_heap_blkno
|
||||
.filter(|&blkno| is_local_vm_page(blkno));
|
||||
// If neither VM page belongs to this shard, discard the record.
|
||||
if clear_vm_bits.old_heap_blkno.is_none() && clear_vm_bits.new_heap_blkno.is_none() {
|
||||
metadata_record = None
|
||||
}
|
||||
}
|
||||
|
||||
Ok(metadata_record)
|
||||
}
|
||||
|
||||
fn decode_heapam_record(
|
||||
|
||||
@@ -62,8 +62,10 @@ async fn ingest(
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
|
||||
let gate = utils::sync::gate::Gate::default();
|
||||
let entered = gate.enter().unwrap();
|
||||
|
||||
let layer = InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, &gate, &ctx).await?;
|
||||
let layer =
|
||||
InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, entered, &ctx).await?;
|
||||
|
||||
let data = Value::Image(Bytes::from(vec![0u8; put_size]));
|
||||
let data_ser_size = data.serialized_size().unwrap() as usize;
|
||||
|
||||
@@ -1144,18 +1144,24 @@ pub(crate) mod mock {
|
||||
rx: tokio::sync::mpsc::UnboundedReceiver<ListWriterQueueMessage>,
|
||||
executor_rx: tokio::sync::mpsc::Receiver<DeleterMessage>,
|
||||
cancel: CancellationToken,
|
||||
executed: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl ConsumerState {
|
||||
async fn consume(&mut self, remote_storage: &GenericRemoteStorage) -> usize {
|
||||
let mut executed = 0;
|
||||
|
||||
async fn consume(&mut self, remote_storage: &GenericRemoteStorage) {
|
||||
info!("Executing all pending deletions");
|
||||
|
||||
// Transform all executor messages to generic frontend messages
|
||||
while let Ok(msg) = self.executor_rx.try_recv() {
|
||||
loop {
|
||||
use either::Either;
|
||||
let msg = tokio::select! {
|
||||
left = self.executor_rx.recv() => Either::Left(left),
|
||||
right = self.rx.recv() => Either::Right(right),
|
||||
};
|
||||
match msg {
|
||||
DeleterMessage::Delete(objects) => {
|
||||
Either::Left(None) => break,
|
||||
Either::Right(None) => break,
|
||||
Either::Left(Some(DeleterMessage::Delete(objects))) => {
|
||||
for path in objects {
|
||||
match remote_storage.delete(&path, &self.cancel).await {
|
||||
Ok(_) => {
|
||||
@@ -1165,18 +1171,13 @@ pub(crate) mod mock {
|
||||
error!("Failed to delete {path}, leaking object! ({e})");
|
||||
}
|
||||
}
|
||||
executed += 1;
|
||||
self.executed.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
DeleterMessage::Flush(flush_op) => {
|
||||
Either::Left(Some(DeleterMessage::Flush(flush_op))) => {
|
||||
flush_op.notify();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
while let Ok(msg) = self.rx.try_recv() {
|
||||
match msg {
|
||||
ListWriterQueueMessage::Delete(op) => {
|
||||
Either::Right(Some(ListWriterQueueMessage::Delete(op))) => {
|
||||
let mut objects = op.objects;
|
||||
for (layer, meta) in op.layers {
|
||||
objects.push(remote_layer_path(
|
||||
@@ -1198,33 +1199,27 @@ pub(crate) mod mock {
|
||||
error!("Failed to delete {path}, leaking object! ({e})");
|
||||
}
|
||||
}
|
||||
executed += 1;
|
||||
self.executed.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
ListWriterQueueMessage::Flush(op) => {
|
||||
Either::Right(Some(ListWriterQueueMessage::Flush(op))) => {
|
||||
op.notify();
|
||||
}
|
||||
ListWriterQueueMessage::FlushExecute(op) => {
|
||||
Either::Right(Some(ListWriterQueueMessage::FlushExecute(op))) => {
|
||||
// We have already executed all prior deletions because mock does them inline
|
||||
op.notify();
|
||||
}
|
||||
ListWriterQueueMessage::Recover(_) => {
|
||||
Either::Right(Some(ListWriterQueueMessage::Recover(_))) => {
|
||||
// no-op in mock
|
||||
}
|
||||
}
|
||||
info!("All pending deletions have been executed");
|
||||
}
|
||||
|
||||
executed
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MockDeletionQueue {
|
||||
tx: tokio::sync::mpsc::UnboundedSender<ListWriterQueueMessage>,
|
||||
executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
|
||||
executed: Arc<AtomicUsize>,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
consumer: std::sync::Mutex<ConsumerState>,
|
||||
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
|
||||
}
|
||||
|
||||
@@ -1235,29 +1230,34 @@ pub(crate) mod mock {
|
||||
|
||||
let executed = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
let mut consumer = ConsumerState {
|
||||
rx,
|
||||
executor_rx,
|
||||
cancel: CancellationToken::new(),
|
||||
executed: executed.clone(),
|
||||
};
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Some(remote_storage) = &remote_storage {
|
||||
consumer.consume(remote_storage).await;
|
||||
}
|
||||
});
|
||||
|
||||
Self {
|
||||
tx,
|
||||
executor_tx,
|
||||
executed,
|
||||
remote_storage,
|
||||
consumer: std::sync::Mutex::new(ConsumerState {
|
||||
rx,
|
||||
executor_rx,
|
||||
cancel: CancellationToken::new(),
|
||||
}),
|
||||
lsn_table: Arc::new(std::sync::RwLock::new(VisibleLsnUpdates::new())),
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::await_holding_lock)]
|
||||
pub async fn pump(&self) {
|
||||
if let Some(remote_storage) = &self.remote_storage {
|
||||
// Permit holding mutex across await, because this is only ever
|
||||
// called once at a time in tests.
|
||||
let mut locked = self.consumer.lock().unwrap();
|
||||
let count = locked.consume(remote_storage).await;
|
||||
self.executed.fetch_add(count, Ordering::Relaxed);
|
||||
}
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
self.executor_tx
|
||||
.send(DeleterMessage::Flush(FlushOp { tx }))
|
||||
.await
|
||||
.expect("Failed to send flush message");
|
||||
rx.await.ok();
|
||||
}
|
||||
|
||||
pub(crate) fn new_client(&self) -> DeletionQueueClient {
|
||||
|
||||
@@ -2144,6 +2144,7 @@ pub(crate) struct WalIngestMetrics {
|
||||
pub(crate) records_committed: IntCounter,
|
||||
pub(crate) records_filtered: IntCounter,
|
||||
pub(crate) gap_blocks_zeroed_on_rel_extend: IntCounter,
|
||||
pub(crate) clear_vm_bits_unknown: IntCounterVec,
|
||||
}
|
||||
|
||||
pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| WalIngestMetrics {
|
||||
@@ -2172,6 +2173,12 @@ pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| WalIngestMet
|
||||
"Total number of zero gap blocks written on relation extends"
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
clear_vm_bits_unknown: register_int_counter_vec!(
|
||||
"pageserver_wal_ingest_clear_vm_bits_unknown",
|
||||
"Number of ignored ClearVmBits operations due to unknown pages/relations",
|
||||
&["entity"],
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
});
|
||||
|
||||
pub(crate) static WAL_REDO_TIME: Lazy<Histogram> = Lazy::new(|| {
|
||||
|
||||
@@ -392,7 +392,9 @@ impl Timeline {
|
||||
result
|
||||
}
|
||||
|
||||
// Get size of a database in blocks
|
||||
/// Get size of a database in blocks. This is only accurate on shard 0. It will undercount on
|
||||
/// other shards, by only accounting for relations the shard has pages for, and only accounting
|
||||
/// for pages up to the highest page number it has stored.
|
||||
pub(crate) async fn get_db_size(
|
||||
&self,
|
||||
spcnode: Oid,
|
||||
@@ -411,7 +413,10 @@ impl Timeline {
|
||||
Ok(total_blocks)
|
||||
}
|
||||
|
||||
/// Get size of a relation file
|
||||
/// Get size of a relation file. The relation must exist, otherwise an error is returned.
|
||||
///
|
||||
/// This is only accurate on shard 0. On other shards, it will return the size up to the highest
|
||||
/// page number stored in the shard.
|
||||
pub(crate) async fn get_rel_size(
|
||||
&self,
|
||||
tag: RelTag,
|
||||
@@ -447,7 +452,10 @@ impl Timeline {
|
||||
Ok(nblocks)
|
||||
}
|
||||
|
||||
/// Does relation exist?
|
||||
/// Does the relation exist?
|
||||
///
|
||||
/// Only shard 0 has a full view of the relations. Other shards only know about relations that
|
||||
/// the shard stores pages for.
|
||||
pub(crate) async fn get_rel_exists(
|
||||
&self,
|
||||
tag: RelTag,
|
||||
@@ -481,6 +489,9 @@ impl Timeline {
|
||||
|
||||
/// Get a list of all existing relations in given tablespace and database.
|
||||
///
|
||||
/// Only shard 0 has a full view of the relations. Other shards only know about relations that
|
||||
/// the shard stores pages for.
|
||||
///
|
||||
/// # Cancel-Safety
|
||||
///
|
||||
/// This method is cancellation-safe.
|
||||
|
||||
@@ -3215,6 +3215,18 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
if let ShutdownMode::Reload = shutdown_mode {
|
||||
tracing::info!("Flushing deletion queue");
|
||||
if let Err(e) = self.deletion_queue_client.flush().await {
|
||||
match e {
|
||||
DeletionQueueError::ShuttingDown => {
|
||||
// This is the only error we expect for now. In the future, if more error
|
||||
// variants are added, we should handle them here.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We cancel the Tenant's cancellation token _after_ the timelines have all shut down. This permits
|
||||
// them to continue to do work during their shutdown methods, e.g. flushing data.
|
||||
tracing::debug!("Cancelling CancellationToken");
|
||||
|
||||
@@ -8,8 +8,10 @@ use crate::page_cache;
|
||||
use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
|
||||
use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
|
||||
use crate::virtual_file::owned_buffers_io::util::size_tracking_writer;
|
||||
use crate::virtual_file::owned_buffers_io::write::Buffer;
|
||||
use crate::virtual_file::{self, owned_buffers_io, IoBufferMut, VirtualFile};
|
||||
use bytes::BytesMut;
|
||||
use camino::Utf8PathBuf;
|
||||
use num_traits::Num;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -18,7 +20,6 @@ use tracing::error;
|
||||
|
||||
use std::io;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::Arc;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
pub struct EphemeralFile {
|
||||
@@ -26,7 +27,10 @@ pub struct EphemeralFile {
|
||||
_timeline_id: TimelineId,
|
||||
page_cache_file_id: page_cache::FileId,
|
||||
bytes_written: u64,
|
||||
buffered_writer: owned_buffers_io::write::BufferedWriter<IoBufferMut, VirtualFile>,
|
||||
buffered_writer: owned_buffers_io::write::BufferedWriter<
|
||||
BytesMut,
|
||||
size_tracking_writer::Writer<VirtualFile>,
|
||||
>,
|
||||
/// Gate guard is held on as long as we need to do operations in the path (delete on drop)
|
||||
_gate_guard: utils::sync::gate::GateGuard,
|
||||
}
|
||||
@@ -38,9 +42,9 @@ impl EphemeralFile {
|
||||
conf: &PageServerConf,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
gate_guard: utils::sync::gate::GateGuard,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<EphemeralFile> {
|
||||
) -> Result<EphemeralFile, io::Error> {
|
||||
static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
|
||||
let filename_disambiguator =
|
||||
NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
@@ -51,17 +55,15 @@ impl EphemeralFile {
|
||||
"ephemeral-{filename_disambiguator}"
|
||||
)));
|
||||
|
||||
let file = Arc::new(
|
||||
VirtualFile::open_with_options_v2(
|
||||
&filename,
|
||||
virtual_file::OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
let file = VirtualFile::open_with_options(
|
||||
&filename,
|
||||
virtual_file::OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore
|
||||
|
||||
@@ -71,12 +73,10 @@ impl EphemeralFile {
|
||||
page_cache_file_id,
|
||||
bytes_written: 0,
|
||||
buffered_writer: owned_buffers_io::write::BufferedWriter::new(
|
||||
file,
|
||||
|| IoBufferMut::with_capacity(TAIL_SZ),
|
||||
gate.enter()?,
|
||||
ctx,
|
||||
size_tracking_writer::Writer::new(file),
|
||||
BytesMut::with_capacity(TAIL_SZ),
|
||||
),
|
||||
_gate_guard: gate.enter()?,
|
||||
_gate_guard: gate_guard,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -85,7 +85,7 @@ impl Drop for EphemeralFile {
|
||||
fn drop(&mut self) {
|
||||
// unlink the file
|
||||
// we are clear to do this, because we have entered a gate
|
||||
let path = self.buffered_writer.as_inner().path();
|
||||
let path = self.buffered_writer.as_inner().as_inner().path();
|
||||
let res = std::fs::remove_file(path);
|
||||
if let Err(e) = res {
|
||||
if e.kind() != std::io::ErrorKind::NotFound {
|
||||
@@ -132,18 +132,6 @@ impl EphemeralFile {
|
||||
srcbuf: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<u64> {
|
||||
let (pos, control) = self.write_raw_controlled(srcbuf, ctx).await?;
|
||||
if let Some(control) = control {
|
||||
control.release().await;
|
||||
}
|
||||
Ok(pos)
|
||||
}
|
||||
|
||||
async fn write_raw_controlled(
|
||||
&mut self,
|
||||
srcbuf: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<(u64, Option<owned_buffers_io::write::FlushControl>)> {
|
||||
let pos = self.bytes_written;
|
||||
|
||||
let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| {
|
||||
@@ -157,9 +145,9 @@ impl EphemeralFile {
|
||||
})?;
|
||||
|
||||
// Write the payload
|
||||
let (nwritten, control) = self
|
||||
let nwritten = self
|
||||
.buffered_writer
|
||||
.write_buffered_borrowed_controlled(srcbuf, ctx)
|
||||
.write_buffered_borrowed(srcbuf, ctx)
|
||||
.await?;
|
||||
assert_eq!(
|
||||
nwritten,
|
||||
@@ -169,7 +157,7 @@ impl EphemeralFile {
|
||||
|
||||
self.bytes_written = new_bytes_written;
|
||||
|
||||
Ok((pos, control))
|
||||
Ok(pos)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -180,12 +168,11 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
|
||||
dst: tokio_epoll_uring::Slice<B>,
|
||||
ctx: &'a RequestContext,
|
||||
) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
|
||||
let submitted_offset = self.buffered_writer.bytes_submitted();
|
||||
let file_size_tracking_writer = self.buffered_writer.as_inner();
|
||||
let flushed_offset = file_size_tracking_writer.bytes_written();
|
||||
|
||||
let mutable = self.buffered_writer.inspect_mutable();
|
||||
let mutable = &mutable[0..mutable.pending()];
|
||||
|
||||
let maybe_flushed = self.buffered_writer.inspect_maybe_flushed();
|
||||
let buffer = self.buffered_writer.inspect_buffer();
|
||||
let buffered = &buffer[0..buffer.pending()];
|
||||
|
||||
let dst_cap = dst.bytes_total().into_u64();
|
||||
let end = {
|
||||
@@ -210,42 +197,11 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let (written_range, maybe_flushed_range) = {
|
||||
if maybe_flushed.is_some() {
|
||||
// [ written ][ maybe_flushed ][ mutable ]
|
||||
// <- TAIL_SZ -><- TAIL_SZ ->
|
||||
// ^
|
||||
// `submitted_offset`
|
||||
// <++++++ on disk +++++++????????????????>
|
||||
(
|
||||
Range(
|
||||
start,
|
||||
std::cmp::min(end, submitted_offset.saturating_sub(TAIL_SZ as u64)),
|
||||
),
|
||||
Range(
|
||||
std::cmp::max(start, submitted_offset.saturating_sub(TAIL_SZ as u64)),
|
||||
std::cmp::min(end, submitted_offset),
|
||||
),
|
||||
)
|
||||
} else {
|
||||
// [ written ][ mutable ]
|
||||
// <- TAIL_SZ ->
|
||||
// ^
|
||||
// `submitted_offset`
|
||||
// <++++++ on disk +++++++++++++++++++++++>
|
||||
(
|
||||
Range(start, std::cmp::min(end, submitted_offset)),
|
||||
// zero len
|
||||
Range(submitted_offset, u64::MIN),
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
let mutable_range = Range(std::cmp::max(start, submitted_offset), end);
|
||||
let written_range = Range(start, std::cmp::min(end, flushed_offset));
|
||||
let buffered_range = Range(std::cmp::max(start, flushed_offset), end);
|
||||
|
||||
let dst = if written_range.len() > 0 {
|
||||
let file: &VirtualFile = self.buffered_writer.as_inner();
|
||||
let file: &VirtualFile = file_size_tracking_writer.as_inner();
|
||||
let bounds = dst.bounds();
|
||||
let slice = file
|
||||
.read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
|
||||
@@ -255,21 +211,19 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
|
||||
dst
|
||||
};
|
||||
|
||||
let dst = if maybe_flushed_range.len() > 0 {
|
||||
let offset_in_buffer = maybe_flushed_range
|
||||
let dst = if buffered_range.len() > 0 {
|
||||
let offset_in_buffer = buffered_range
|
||||
.0
|
||||
.checked_sub(submitted_offset.saturating_sub(TAIL_SZ as u64))
|
||||
.checked_sub(flushed_offset)
|
||||
.unwrap()
|
||||
.into_usize();
|
||||
// Checked previously the buffer is Some.
|
||||
let maybe_flushed = maybe_flushed.unwrap();
|
||||
let to_copy = &maybe_flushed
|
||||
[offset_in_buffer..(offset_in_buffer + maybe_flushed_range.len().into_usize())];
|
||||
let to_copy =
|
||||
&buffered[offset_in_buffer..(offset_in_buffer + buffered_range.len().into_usize())];
|
||||
let bounds = dst.bounds();
|
||||
let mut view = dst.slice({
|
||||
let start = written_range.len().into_usize();
|
||||
let end = start
|
||||
.checked_add(maybe_flushed_range.len().into_usize())
|
||||
.checked_add(buffered_range.len().into_usize())
|
||||
.unwrap();
|
||||
start..end
|
||||
});
|
||||
@@ -280,28 +234,6 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
|
||||
dst
|
||||
};
|
||||
|
||||
let dst = if mutable_range.len() > 0 {
|
||||
let offset_in_buffer = mutable_range
|
||||
.0
|
||||
.checked_sub(submitted_offset)
|
||||
.unwrap()
|
||||
.into_usize();
|
||||
let to_copy =
|
||||
&mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())];
|
||||
let bounds = dst.bounds();
|
||||
let mut view = dst.slice({
|
||||
let start =
|
||||
written_range.len().into_usize() + maybe_flushed_range.len().into_usize();
|
||||
let end = start.checked_add(mutable_range.len().into_usize()).unwrap();
|
||||
start..end
|
||||
});
|
||||
view.as_mut_rust_slice_full_zeroed()
|
||||
.copy_from_slice(to_copy);
|
||||
Slice::from_buf_bounds(Slice::into_inner(view), bounds)
|
||||
} else {
|
||||
dst
|
||||
};
|
||||
|
||||
// TODO: in debug mode, randomize the remaining bytes in `dst` to catch bugs
|
||||
|
||||
Ok((dst, (end - start).into_usize()))
|
||||
@@ -363,7 +295,7 @@ mod tests {
|
||||
|
||||
let gate = utils::sync::gate::Gate::default();
|
||||
|
||||
let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx)
|
||||
let file = EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -394,15 +326,14 @@ mod tests {
|
||||
|
||||
let gate = utils::sync::gate::Gate::default();
|
||||
|
||||
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut file =
|
||||
EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mutable = file.buffered_writer.inspect_mutable();
|
||||
let cap = mutable.capacity();
|
||||
let align = mutable.align();
|
||||
let cap = file.buffered_writer.inspect_buffer().capacity();
|
||||
|
||||
let write_nbytes = cap * 2 + cap / 2;
|
||||
let write_nbytes = cap + cap / 2;
|
||||
|
||||
let content: Vec<u8> = rand::thread_rng()
|
||||
.sample_iter(rand::distributions::Standard)
|
||||
@@ -410,39 +341,30 @@ mod tests {
|
||||
.collect();
|
||||
|
||||
let mut value_offsets = Vec::new();
|
||||
for range in (0..write_nbytes)
|
||||
.step_by(align)
|
||||
.map(|start| start..(start + align).min(write_nbytes))
|
||||
{
|
||||
let off = file.write_raw(&content[range], &ctx).await.unwrap();
|
||||
for i in 0..write_nbytes {
|
||||
let off = file.write_raw(&content[i..i + 1], &ctx).await.unwrap();
|
||||
value_offsets.push(off);
|
||||
}
|
||||
|
||||
assert_eq!(file.len() as usize, write_nbytes);
|
||||
for (i, range) in (0..write_nbytes)
|
||||
.step_by(align)
|
||||
.map(|start| start..(start + align).min(write_nbytes))
|
||||
.enumerate()
|
||||
{
|
||||
assert_eq!(value_offsets[i], range.start.into_u64());
|
||||
let buf = IoBufferMut::with_capacity(range.len());
|
||||
assert!(file.len() as usize == write_nbytes);
|
||||
for i in 0..write_nbytes {
|
||||
assert_eq!(value_offsets[i], i.into_u64());
|
||||
let buf = IoBufferMut::with_capacity(1);
|
||||
let (buf_slice, nread) = file
|
||||
.read_exact_at_eof_ok(range.start.into_u64(), buf.slice_full(), &ctx)
|
||||
.read_exact_at_eof_ok(i.into_u64(), buf.slice_full(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let buf = buf_slice.into_inner();
|
||||
assert_eq!(nread, range.len());
|
||||
assert_eq!(&buf, &content[range]);
|
||||
assert_eq!(nread, 1);
|
||||
assert_eq!(&buf, &content[i..i + 1]);
|
||||
}
|
||||
|
||||
let file_contents = std::fs::read(file.buffered_writer.as_inner().path()).unwrap();
|
||||
assert!(file_contents == content[0..cap * 2]);
|
||||
let file_contents =
|
||||
std::fs::read(file.buffered_writer.as_inner().as_inner().path()).unwrap();
|
||||
assert_eq!(file_contents, &content[0..cap]);
|
||||
|
||||
let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
|
||||
assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]);
|
||||
|
||||
let mutable_buffer_contents = file.buffered_writer.inspect_mutable();
|
||||
assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]);
|
||||
let buffer_contents = file.buffered_writer.inspect_buffer();
|
||||
assert_eq!(buffer_contents, &content[cap..write_nbytes]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -451,16 +373,16 @@ mod tests {
|
||||
|
||||
let gate = utils::sync::gate::Gate::default();
|
||||
|
||||
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut file =
|
||||
EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// mutable buffer and maybe_flushed buffer each has `cap` bytes.
|
||||
let cap = file.buffered_writer.inspect_mutable().capacity();
|
||||
let cap = file.buffered_writer.inspect_buffer().capacity();
|
||||
|
||||
let content: Vec<u8> = rand::thread_rng()
|
||||
.sample_iter(rand::distributions::Standard)
|
||||
.take(cap * 2 + cap / 2)
|
||||
.take(cap + cap / 2)
|
||||
.collect();
|
||||
|
||||
file.write_raw(&content, &ctx).await.unwrap();
|
||||
@@ -468,21 +390,23 @@ mod tests {
|
||||
// assert the state is as this test expects it to be
|
||||
assert_eq!(
|
||||
&file.load_to_io_buf(&ctx).await.unwrap(),
|
||||
&content[0..cap * 2 + cap / 2]
|
||||
&content[0..cap + cap / 2]
|
||||
);
|
||||
let md = file.buffered_writer.as_inner().path().metadata().unwrap();
|
||||
let md = file
|
||||
.buffered_writer
|
||||
.as_inner()
|
||||
.as_inner()
|
||||
.path()
|
||||
.metadata()
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
md.len(),
|
||||
2 * cap.into_u64(),
|
||||
"buffered writer requires one write to be flushed if we write 2.5x buffer capacity"
|
||||
cap.into_u64(),
|
||||
"buffered writer does one write if we write 1.5x buffer capacity"
|
||||
);
|
||||
assert_eq!(
|
||||
&file.buffered_writer.inspect_maybe_flushed().unwrap()[0..cap],
|
||||
&content[cap..cap * 2]
|
||||
);
|
||||
assert_eq!(
|
||||
&file.buffered_writer.inspect_mutable()[0..cap / 2],
|
||||
&content[cap * 2..cap * 2 + cap / 2]
|
||||
&file.buffered_writer.inspect_buffer()[0..cap / 2],
|
||||
&content[cap..cap + cap / 2]
|
||||
);
|
||||
}
|
||||
|
||||
@@ -498,19 +422,19 @@ mod tests {
|
||||
|
||||
let gate = utils::sync::gate::Gate::default();
|
||||
|
||||
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut file =
|
||||
EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let cap = file.buffered_writer.inspect_buffer().capacity();
|
||||
|
||||
let mutable = file.buffered_writer.inspect_mutable();
|
||||
let cap = mutable.capacity();
|
||||
let align = mutable.align();
|
||||
let content: Vec<u8> = rand::thread_rng()
|
||||
.sample_iter(rand::distributions::Standard)
|
||||
.take(cap * 2 + cap / 2)
|
||||
.take(cap + cap / 2)
|
||||
.collect();
|
||||
|
||||
let (_, control) = file.write_raw_controlled(&content, &ctx).await.unwrap();
|
||||
file.write_raw(&content, &ctx).await.unwrap();
|
||||
|
||||
let test_read = |start: usize, len: usize| {
|
||||
let file = &file;
|
||||
@@ -530,38 +454,16 @@ mod tests {
|
||||
}
|
||||
};
|
||||
|
||||
let test_read_all_offset_combinations = || {
|
||||
async move {
|
||||
test_read(align, align).await;
|
||||
// border onto edge of file
|
||||
test_read(cap - align, align).await;
|
||||
// read across file and buffer
|
||||
test_read(cap - align, 2 * align).await;
|
||||
// stay from start of maybe flushed buffer
|
||||
test_read(cap, align).await;
|
||||
// completely within maybe flushed buffer
|
||||
test_read(cap + align, align).await;
|
||||
// border onto edge of maybe flushed buffer.
|
||||
test_read(cap * 2 - align, align).await;
|
||||
// read across maybe flushed and mutable buffer
|
||||
test_read(cap * 2 - align, 2 * align).await;
|
||||
// read across three segments
|
||||
test_read(cap - align, cap + 2 * align).await;
|
||||
// completely within mutable buffer
|
||||
test_read(cap * 2 + align, align).await;
|
||||
}
|
||||
};
|
||||
|
||||
// completely within the file range
|
||||
assert!(align < cap, "test assumption");
|
||||
assert!(cap % align == 0);
|
||||
|
||||
// test reads at different flush stages.
|
||||
let not_started = control.unwrap().into_not_started();
|
||||
test_read_all_offset_combinations().await;
|
||||
let in_progress = not_started.ready_to_flush();
|
||||
test_read_all_offset_combinations().await;
|
||||
in_progress.wait_until_flush_is_done().await;
|
||||
test_read_all_offset_combinations().await;
|
||||
assert!(20 < cap, "test assumption");
|
||||
test_read(10, 10).await;
|
||||
// border onto edge of file
|
||||
test_read(cap - 10, 10).await;
|
||||
// read across file and buffer
|
||||
test_read(cap - 10, 20).await;
|
||||
// stay from start of buffer
|
||||
test_read(cap, 10).await;
|
||||
// completely within buffer
|
||||
test_read(cap + 10, 10).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1960,7 +1960,7 @@ impl TenantManager {
|
||||
attempt.before_reset_tenant();
|
||||
|
||||
let (_guard, progress) = utils::completion::channel();
|
||||
match tenant.shutdown(progress, ShutdownMode::Flush).await {
|
||||
match tenant.shutdown(progress, ShutdownMode::Reload).await {
|
||||
Ok(()) => {
|
||||
slot_guard.drop_old_value().expect("it was just shutdown");
|
||||
}
|
||||
|
||||
@@ -681,7 +681,6 @@ impl RemoteTimelineClient {
|
||||
layer_file_name: &LayerName,
|
||||
layer_metadata: &LayerFileMetadata,
|
||||
local_path: &Utf8Path,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<u64, DownloadError> {
|
||||
@@ -701,7 +700,6 @@ impl RemoteTimelineClient {
|
||||
layer_file_name,
|
||||
layer_metadata,
|
||||
local_path,
|
||||
gate,
|
||||
cancel,
|
||||
ctx,
|
||||
)
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
use std::collections::HashSet;
|
||||
use std::future::Future;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
@@ -27,7 +26,9 @@ use crate::span::{
|
||||
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
|
||||
use crate::tenant::storage_layer::LayerName;
|
||||
use crate::tenant::Generation;
|
||||
use crate::virtual_file::{on_fatal_io_error, IoBufferMut, MaybeFatalIo, VirtualFile};
|
||||
#[cfg_attr(target_os = "macos", allow(unused_imports))]
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
|
||||
use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
|
||||
use crate::TEMP_FILE_SUFFIX;
|
||||
use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath};
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
@@ -57,7 +58,6 @@ pub async fn download_layer_file<'a>(
|
||||
layer_file_name: &'a LayerName,
|
||||
layer_metadata: &'a LayerFileMetadata,
|
||||
local_path: &Utf8Path,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<u64, DownloadError> {
|
||||
@@ -86,9 +86,7 @@ pub async fn download_layer_file<'a>(
|
||||
let temp_file_path = path_with_suffix_extension(local_path, TEMP_DOWNLOAD_EXTENSION);
|
||||
|
||||
let bytes_amount = download_retry(
|
||||
|| async {
|
||||
download_object(storage, &remote_path, &temp_file_path, gate, cancel, ctx).await
|
||||
},
|
||||
|| async { download_object(storage, &remote_path, &temp_file_path, cancel, ctx).await },
|
||||
&format!("download {remote_path:?}"),
|
||||
cancel,
|
||||
)
|
||||
@@ -148,7 +146,6 @@ async fn download_object<'a>(
|
||||
storage: &'a GenericRemoteStorage,
|
||||
src_path: &RemotePath,
|
||||
dst_path: &Utf8PathBuf,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: &CancellationToken,
|
||||
#[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &RequestContext,
|
||||
) -> Result<u64, DownloadError> {
|
||||
@@ -206,16 +203,13 @@ async fn download_object<'a>(
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
|
||||
use crate::virtual_file::owned_buffers_io;
|
||||
use crate::virtual_file::owned_buffers_io::{self, util::size_tracking_writer};
|
||||
use bytes::BytesMut;
|
||||
async {
|
||||
let destination_file = Arc::new(
|
||||
VirtualFile::create(dst_path, ctx)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("create a destination file for layer '{dst_path}'")
|
||||
})
|
||||
.map_err(DownloadError::Other)?,
|
||||
);
|
||||
let destination_file = VirtualFile::create(dst_path, ctx)
|
||||
.await
|
||||
.with_context(|| format!("create a destination file for layer '{dst_path}'"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
let mut download = storage
|
||||
.download(src_path, &DownloadOpts::default(), cancel)
|
||||
@@ -223,16 +217,14 @@ async fn download_object<'a>(
|
||||
|
||||
pausable_failpoint!("before-downloading-layer-stream-pausable");
|
||||
|
||||
let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
|
||||
destination_file,
|
||||
|| IoBufferMut::with_capacity(super::BUFFER_SIZE),
|
||||
gate.enter().map_err(|_| DownloadError::Cancelled)?,
|
||||
ctx,
|
||||
);
|
||||
|
||||
// TODO: use vectored write (writev) once supported by tokio-epoll-uring.
|
||||
// There's chunks_vectored() on the stream.
|
||||
let (bytes_amount, destination_file) = async {
|
||||
let size_tracking = size_tracking_writer::Writer::new(destination_file);
|
||||
let mut buffered = owned_buffers_io::write::BufferedWriter::<BytesMut, _>::new(
|
||||
size_tracking,
|
||||
BytesMut::with_capacity(super::BUFFER_SIZE),
|
||||
);
|
||||
while let Some(res) =
|
||||
futures::StreamExt::next(&mut download.download_stream).await
|
||||
{
|
||||
@@ -240,10 +232,10 @@ async fn download_object<'a>(
|
||||
Ok(chunk) => chunk,
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
buffered.write_buffered_borrowed(&chunk, ctx).await?;
|
||||
buffered.write_buffered(chunk.slice_len(), ctx).await?;
|
||||
}
|
||||
let inner = buffered.flush_and_into_inner(ctx).await?;
|
||||
Ok(inner)
|
||||
let size_tracking = buffered.flush_and_into_inner(ctx).await?;
|
||||
Ok(size_tracking.into_inner())
|
||||
}
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -1181,7 +1181,6 @@ impl<'a> TenantDownloader<'a> {
|
||||
&layer.name,
|
||||
&layer.metadata,
|
||||
&local_path,
|
||||
&self.secondary_state.gate,
|
||||
&self.secondary_state.cancel,
|
||||
ctx,
|
||||
)
|
||||
|
||||
@@ -555,12 +555,13 @@ impl InMemoryLayer {
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
start_lsn: Lsn,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
gate_guard: utils::sync::gate::GateGuard,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<InMemoryLayer> {
|
||||
trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
|
||||
|
||||
let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate, ctx).await?;
|
||||
let file =
|
||||
EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?;
|
||||
let key = InMemoryLayerFileId(file.page_cache_file_id());
|
||||
|
||||
Ok(InMemoryLayer {
|
||||
|
||||
@@ -1149,7 +1149,6 @@ impl LayerInner {
|
||||
&self.desc.layer_name(),
|
||||
&self.metadata(),
|
||||
&self.path,
|
||||
&timeline.gate,
|
||||
&timeline.cancel,
|
||||
ctx,
|
||||
)
|
||||
|
||||
@@ -894,10 +894,11 @@ pub(crate) enum ShutdownMode {
|
||||
/// While we are flushing, we continue to accept read I/O for LSNs ingested before
|
||||
/// the call to [`Timeline::shutdown`].
|
||||
FreezeAndFlush,
|
||||
/// Only flush the layers to the remote storage without freezing any open layers. This is the
|
||||
/// mode used by ancestor detach and any other operations that reloads a tenant but not increasing
|
||||
/// the generation number.
|
||||
Flush,
|
||||
/// Only flush the layers to the remote storage without freezing any open layers. Flush the deletion
|
||||
/// queue. This is the mode used by ancestor detach and any other operations that reloads a tenant
|
||||
/// but not increasing the generation number. Note that this mode cannot be used at tenant shutdown,
|
||||
/// as flushing the deletion queue at that time will cause shutdown-in-progress errors.
|
||||
Reload,
|
||||
/// Shut down immediately, without waiting for any open layers to flush.
|
||||
Hard,
|
||||
}
|
||||
@@ -1818,7 +1819,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
if let ShutdownMode::Flush = mode {
|
||||
if let ShutdownMode::Reload = mode {
|
||||
// drain the upload queue
|
||||
self.remote_client.shutdown().await;
|
||||
if !self.remote_client.no_pending_work() {
|
||||
@@ -3487,6 +3488,7 @@ impl Timeline {
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<InMemoryLayer>> {
|
||||
let mut guard = self.layers.write().await;
|
||||
let gate_guard = self.gate.enter().context("enter gate for inmem layer")?;
|
||||
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
ensure!(
|
||||
@@ -3503,7 +3505,7 @@ impl Timeline {
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
&self.gate,
|
||||
gate_guard,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -182,7 +182,7 @@ impl OpenLayerManager {
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
gate_guard: utils::sync::gate::GateGuard,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<InMemoryLayer>> {
|
||||
ensure!(lsn.is_aligned());
|
||||
@@ -212,9 +212,15 @@ impl OpenLayerManager {
|
||||
lsn
|
||||
);
|
||||
|
||||
let new_layer =
|
||||
InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn, &gate, ctx)
|
||||
.await?;
|
||||
let new_layer = InMemoryLayer::create(
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
start_lsn,
|
||||
gate_guard,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
let layer = Arc::new(new_layer);
|
||||
|
||||
self.layer_map.open_layer = Some(layer.clone());
|
||||
|
||||
@@ -58,7 +58,7 @@ pub(crate) async fn offload_timeline(
|
||||
}
|
||||
|
||||
// Now that the Timeline is in Stopping state, request all the related tasks to shut down.
|
||||
timeline.shutdown(super::ShutdownMode::Flush).await;
|
||||
timeline.shutdown(super::ShutdownMode::Reload).await;
|
||||
|
||||
// TODO extend guard mechanism above with method
|
||||
// to make deletions possible while offloading is in progress
|
||||
|
||||
@@ -20,7 +20,7 @@ use camino::{Utf8Path, Utf8PathBuf};
|
||||
use once_cell::sync::OnceCell;
|
||||
use owned_buffers_io::aligned_buffer::buffer::AlignedBuffer;
|
||||
use owned_buffers_io::aligned_buffer::{AlignedBufferMut, AlignedSlice, ConstAlign};
|
||||
use owned_buffers_io::io_buf_aligned::{IoBufAligned, IoBufAlignedMut};
|
||||
use owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
|
||||
use owned_buffers_io::io_buf_ext::FullSlice;
|
||||
use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -63,6 +63,9 @@ pub(crate) mod owned_buffers_io {
|
||||
pub(crate) mod io_buf_ext;
|
||||
pub(crate) mod slice;
|
||||
pub(crate) mod write;
|
||||
pub(crate) mod util {
|
||||
pub(crate) mod size_tracking_writer;
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -218,7 +221,7 @@ impl VirtualFile {
|
||||
self.inner.read_exact_at_page(page, offset, ctx).await
|
||||
}
|
||||
|
||||
pub async fn write_all_at<Buf: IoBufAligned + Send>(
|
||||
pub async fn write_all_at<Buf: IoBuf + Send>(
|
||||
&self,
|
||||
buf: FullSlice<Buf>,
|
||||
offset: u64,
|
||||
@@ -1322,14 +1325,14 @@ impl Drop for VirtualFileInner {
|
||||
}
|
||||
|
||||
impl OwnedAsyncWriter for VirtualFile {
|
||||
async fn write_all_at<Buf: IoBufAligned + Send>(
|
||||
&self,
|
||||
#[inline(always)]
|
||||
async fn write_all<Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
buf: FullSlice<Buf>,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<FullSlice<Buf>> {
|
||||
let (buf, res) = VirtualFile::write_all_at(self, buf, offset, ctx).await;
|
||||
res.map(|_| buf)
|
||||
) -> std::io::Result<(usize, FullSlice<Buf>)> {
|
||||
let (buf, res) = VirtualFile::write_all(self, buf, ctx).await;
|
||||
res.map(move |v| (v, buf))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1448,7 +1451,7 @@ mod tests {
|
||||
}
|
||||
}
|
||||
}
|
||||
async fn write_all_at<Buf: IoBufAligned + Send>(
|
||||
async fn write_all_at<Buf: IoBuf + Send>(
|
||||
&self,
|
||||
buf: FullSlice<Buf>,
|
||||
offset: u64,
|
||||
@@ -1591,7 +1594,6 @@ mod tests {
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
file_a
|
||||
.write_all(b"foobar".to_vec().slice_len(), &ctx)
|
||||
.await?;
|
||||
@@ -1650,10 +1652,10 @@ mod tests {
|
||||
)
|
||||
.await?;
|
||||
file_b
|
||||
.write_all_at(IoBuffer::from(b"BAR").slice_len(), 3, &ctx)
|
||||
.write_all_at(b"BAR".to_vec().slice_len(), 3, &ctx)
|
||||
.await?;
|
||||
file_b
|
||||
.write_all_at(IoBuffer::from(b"FOO").slice_len(), 0, &ctx)
|
||||
.write_all_at(b"FOO".to_vec().slice_len(), 0, &ctx)
|
||||
.await?;
|
||||
|
||||
assert_eq!(file_b.read_string_at(2, 3, &ctx).await?, "OBA");
|
||||
|
||||
@@ -4,7 +4,7 @@ pub trait Alignment: std::marker::Unpin + 'static {
|
||||
}
|
||||
|
||||
/// Alignment at compile time.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
#[derive(Debug)]
|
||||
pub struct ConstAlign<const A: usize>;
|
||||
|
||||
impl<const A: usize> Alignment for ConstAlign<A> {
|
||||
@@ -14,7 +14,7 @@ impl<const A: usize> Alignment for ConstAlign<A> {
|
||||
}
|
||||
|
||||
/// Alignment at run time.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
#[derive(Debug)]
|
||||
pub struct RuntimeAlign {
|
||||
align: usize,
|
||||
}
|
||||
|
||||
@@ -3,10 +3,9 @@ use std::{
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use super::{alignment::Alignment, raw::RawAlignedBuffer, AlignedBufferMut, ConstAlign};
|
||||
use super::{alignment::Alignment, raw::RawAlignedBuffer};
|
||||
|
||||
/// An shared, immutable aligned buffer type.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AlignedBuffer<A: Alignment> {
|
||||
/// Shared raw buffer.
|
||||
raw: Arc<RawAlignedBuffer<A>>,
|
||||
@@ -87,13 +86,6 @@ impl<A: Alignment> AlignedBuffer<A> {
|
||||
range: begin..end,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the mutable aligned buffer, if the immutable aligned buffer
|
||||
/// has exactly one strong reference. Otherwise returns `None`.
|
||||
pub fn into_mut(self) -> Option<AlignedBufferMut<A>> {
|
||||
let raw = Arc::into_inner(self.raw)?;
|
||||
Some(AlignedBufferMut::from_raw(raw))
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: Alignment> Deref for AlignedBuffer<A> {
|
||||
@@ -116,14 +108,6 @@ impl<A: Alignment> PartialEq<[u8]> for AlignedBuffer<A> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<const A: usize, const N: usize> From<&[u8; N]> for AlignedBuffer<ConstAlign<A>> {
|
||||
fn from(value: &[u8; N]) -> Self {
|
||||
let mut buf = AlignedBufferMut::with_capacity(N);
|
||||
buf.extend_from_slice(value);
|
||||
buf.freeze()
|
||||
}
|
||||
}
|
||||
|
||||
/// SAFETY: the underlying buffer references a stable memory region.
|
||||
unsafe impl<A: Alignment> tokio_epoll_uring::IoBuf for AlignedBuffer<A> {
|
||||
fn stable_ptr(&self) -> *const u8 {
|
||||
|
||||
@@ -1,7 +1,4 @@
|
||||
use std::{
|
||||
mem::MaybeUninit,
|
||||
ops::{Deref, DerefMut},
|
||||
};
|
||||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
use super::{
|
||||
alignment::{Alignment, ConstAlign},
|
||||
@@ -49,11 +46,6 @@ impl<const A: usize> AlignedBufferMut<ConstAlign<A>> {
|
||||
}
|
||||
|
||||
impl<A: Alignment> AlignedBufferMut<A> {
|
||||
/// Constructs a mutable aligned buffer from raw.
|
||||
pub(super) fn from_raw(raw: RawAlignedBuffer<A>) -> Self {
|
||||
AlignedBufferMut { raw }
|
||||
}
|
||||
|
||||
/// Returns the total number of bytes the buffer can hold.
|
||||
#[inline]
|
||||
pub fn capacity(&self) -> usize {
|
||||
@@ -136,39 +128,6 @@ impl<A: Alignment> AlignedBufferMut<A> {
|
||||
let len = self.len();
|
||||
AlignedBuffer::from_raw(self.raw, 0..len)
|
||||
}
|
||||
|
||||
/// Clones and appends all elements in a slice to the buffer. Reserves additional capacity as needed.
|
||||
#[inline]
|
||||
pub fn extend_from_slice(&mut self, extend: &[u8]) {
|
||||
let cnt = extend.len();
|
||||
self.reserve(cnt);
|
||||
|
||||
// SAFETY: we already reserved additional `cnt` bytes, safe to perform memcpy.
|
||||
unsafe {
|
||||
let dst = self.spare_capacity_mut();
|
||||
// Reserved above
|
||||
debug_assert!(dst.len() >= cnt);
|
||||
|
||||
core::ptr::copy_nonoverlapping(extend.as_ptr(), dst.as_mut_ptr().cast(), cnt);
|
||||
}
|
||||
// SAFETY: We do have at least `cnt` bytes remaining before advance.
|
||||
unsafe {
|
||||
bytes::BufMut::advance_mut(self, cnt);
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the remaining spare capacity of the vector as a slice of `MaybeUninit<u8>`.
|
||||
#[inline]
|
||||
fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit<u8>] {
|
||||
// SAFETY: we guarantees that the `Self::capacity()` bytes from
|
||||
// `Self::as_mut_ptr()` are allocated.
|
||||
unsafe {
|
||||
let ptr = self.as_mut_ptr().add(self.len());
|
||||
let len = self.capacity() - self.len();
|
||||
|
||||
core::slice::from_raw_parts_mut(ptr.cast(), len)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: Alignment> Deref for AlignedBufferMut<A> {
|
||||
|
||||
@@ -1,15 +1,9 @@
|
||||
use tokio_epoll_uring::{IoBuf, IoBufMut};
|
||||
use tokio_epoll_uring::IoBufMut;
|
||||
|
||||
use crate::virtual_file::{IoBuffer, IoBufferMut, PageWriteGuardBuf};
|
||||
use crate::virtual_file::{IoBufferMut, PageWriteGuardBuf};
|
||||
|
||||
/// A marker trait for a mutable aligned buffer type.
|
||||
pub trait IoBufAlignedMut: IoBufMut {}
|
||||
|
||||
/// A marker trait for an aligned buffer type.
|
||||
pub trait IoBufAligned: IoBuf {}
|
||||
|
||||
impl IoBufAlignedMut for IoBufferMut {}
|
||||
|
||||
impl IoBufAligned for IoBuffer {}
|
||||
|
||||
impl IoBufAlignedMut for PageWriteGuardBuf {}
|
||||
|
||||
@@ -5,8 +5,6 @@ use bytes::{Bytes, BytesMut};
|
||||
use std::ops::{Deref, Range};
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
|
||||
|
||||
use super::write::CheapCloneForRead;
|
||||
|
||||
/// The true owned equivalent for Rust [`slice`]. Use this for the write path.
|
||||
///
|
||||
/// Unlike [`tokio_epoll_uring::Slice`], which we unfortunately inherited from `tokio-uring`,
|
||||
@@ -45,17 +43,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<B> CheapCloneForRead for FullSlice<B>
|
||||
where
|
||||
B: IoBuf + CheapCloneForRead,
|
||||
{
|
||||
fn cheap_clone(&self) -> Self {
|
||||
Self {
|
||||
slice: self.slice.get_ref().cheap_clone().slice_full(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) trait IoBufExt {
|
||||
/// Get a [`FullSlice`] for the entire buffer, i.e., `self[..]` or `self[0..self.len()]`.
|
||||
fn slice_len(self) -> FullSlice<Self>
|
||||
|
||||
@@ -0,0 +1,50 @@
|
||||
use crate::{
|
||||
context::RequestContext,
|
||||
virtual_file::owned_buffers_io::{io_buf_ext::FullSlice, write::OwnedAsyncWriter},
|
||||
};
|
||||
use tokio_epoll_uring::IoBuf;
|
||||
|
||||
pub struct Writer<W> {
|
||||
dst: W,
|
||||
bytes_amount: u64,
|
||||
}
|
||||
|
||||
impl<W> Writer<W> {
|
||||
pub fn new(dst: W) -> Self {
|
||||
Self {
|
||||
dst,
|
||||
bytes_amount: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn bytes_written(&self) -> u64 {
|
||||
self.bytes_amount
|
||||
}
|
||||
|
||||
pub fn as_inner(&self) -> &W {
|
||||
&self.dst
|
||||
}
|
||||
|
||||
/// Returns the wrapped `VirtualFile` object as well as the number
|
||||
/// of bytes that were written to it through this object.
|
||||
#[cfg_attr(target_os = "macos", allow(dead_code))]
|
||||
pub fn into_inner(self) -> (u64, W) {
|
||||
(self.bytes_amount, self.dst)
|
||||
}
|
||||
}
|
||||
|
||||
impl<W> OwnedAsyncWriter for Writer<W>
|
||||
where
|
||||
W: OwnedAsyncWriter,
|
||||
{
|
||||
#[inline(always)]
|
||||
async fn write_all<Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
buf: FullSlice<Buf>,
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<(usize, FullSlice<Buf>)> {
|
||||
let (nwritten, buf) = self.dst.write_all(buf, ctx).await?;
|
||||
self.bytes_amount += u64::try_from(nwritten).unwrap();
|
||||
Ok((nwritten, buf))
|
||||
}
|
||||
}
|
||||
@@ -1,89 +1,55 @@
|
||||
mod flush;
|
||||
use std::sync::Arc;
|
||||
|
||||
use bytes::BytesMut;
|
||||
use flush::FlushHandle;
|
||||
use tokio_epoll_uring::IoBuf;
|
||||
|
||||
use crate::{
|
||||
context::RequestContext,
|
||||
virtual_file::{IoBuffer, IoBufferMut},
|
||||
};
|
||||
use crate::context::RequestContext;
|
||||
|
||||
use super::{
|
||||
io_buf_aligned::IoBufAligned,
|
||||
io_buf_ext::{FullSlice, IoBufExt},
|
||||
};
|
||||
|
||||
pub(crate) use flush::FlushControl;
|
||||
|
||||
pub(crate) trait CheapCloneForRead {
|
||||
/// Returns a cheap clone of the buffer.
|
||||
fn cheap_clone(&self) -> Self;
|
||||
}
|
||||
|
||||
impl CheapCloneForRead for IoBuffer {
|
||||
fn cheap_clone(&self) -> Self {
|
||||
// Cheap clone over an `Arc`.
|
||||
self.clone()
|
||||
}
|
||||
}
|
||||
use super::io_buf_ext::{FullSlice, IoBufExt};
|
||||
|
||||
/// A trait for doing owned-buffer write IO.
|
||||
/// Think [`tokio::io::AsyncWrite`] but with owned buffers.
|
||||
/// The owned buffers need to be aligned due to Direct IO requirements.
|
||||
pub trait OwnedAsyncWriter {
|
||||
fn write_all_at<Buf: IoBufAligned + Send>(
|
||||
&self,
|
||||
async fn write_all<Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
buf: FullSlice<Buf>,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> impl std::future::Future<Output = std::io::Result<FullSlice<Buf>>> + Send;
|
||||
) -> std::io::Result<(usize, FullSlice<Buf>)>;
|
||||
}
|
||||
|
||||
/// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch
|
||||
/// small writes into larger writes of size [`Buffer::cap`].
|
||||
// TODO(yuchen): For large write, implementing buffer bypass for aligned parts of the write could be beneficial to throughput,
|
||||
// since we would avoid copying majority of the data into the internal buffer.
|
||||
pub struct BufferedWriter<B: Buffer, W> {
|
||||
writer: Arc<W>,
|
||||
///
|
||||
/// # Passthrough Of Large Writers
|
||||
///
|
||||
/// Calls to [`BufferedWriter::write_buffered`] that are larger than [`Buffer::cap`]
|
||||
/// cause the internal buffer to be flushed prematurely so that the large
|
||||
/// buffered write is passed through to the underlying [`OwnedAsyncWriter`].
|
||||
///
|
||||
/// This pass-through is generally beneficial for throughput, but if
|
||||
/// the storage backend of the [`OwnedAsyncWriter`] is a shared resource,
|
||||
/// unlimited large writes may cause latency or fairness issues.
|
||||
///
|
||||
/// In such cases, a different implementation that always buffers in memory
|
||||
/// may be preferable.
|
||||
pub struct BufferedWriter<B, W> {
|
||||
writer: W,
|
||||
/// invariant: always remains Some(buf) except
|
||||
/// - while IO is ongoing => goes back to Some() once the IO completed successfully
|
||||
/// - after an IO error => stays `None` forever
|
||||
///
|
||||
/// In these exceptional cases, it's `None`.
|
||||
mutable: Option<B>,
|
||||
/// A handle to the background flush task for writting data to disk.
|
||||
flush_handle: FlushHandle<B::IoBuf, W>,
|
||||
/// The number of bytes submitted to the background task.
|
||||
bytes_submitted: u64,
|
||||
buf: Option<B>,
|
||||
}
|
||||
|
||||
impl<B, Buf, W> BufferedWriter<B, W>
|
||||
where
|
||||
B: Buffer<IoBuf = Buf> + Send + 'static,
|
||||
Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
|
||||
W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
|
||||
B: Buffer<IoBuf = Buf> + Send,
|
||||
Buf: IoBuf + Send,
|
||||
W: OwnedAsyncWriter,
|
||||
{
|
||||
/// Creates a new buffered writer.
|
||||
///
|
||||
/// The `buf_new` function provides a way to initialize the owned buffers used by this writer.
|
||||
pub fn new(
|
||||
writer: Arc<W>,
|
||||
buf_new: impl Fn() -> B,
|
||||
gate_guard: utils::sync::gate::GateGuard,
|
||||
ctx: &RequestContext,
|
||||
) -> Self {
|
||||
pub fn new(writer: W, buf: B) -> Self {
|
||||
Self {
|
||||
writer: writer.clone(),
|
||||
mutable: Some(buf_new()),
|
||||
flush_handle: FlushHandle::spawn_new(
|
||||
writer,
|
||||
buf_new(),
|
||||
gate_guard,
|
||||
ctx.attached_child(),
|
||||
),
|
||||
bytes_submitted: 0,
|
||||
writer,
|
||||
buf: Some(buf),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -91,70 +57,87 @@ where
|
||||
&self.writer
|
||||
}
|
||||
|
||||
/// Returns the number of bytes submitted to the background flush task.
|
||||
pub fn bytes_submitted(&self) -> u64 {
|
||||
self.bytes_submitted
|
||||
}
|
||||
|
||||
/// Panics if used after any of the write paths returned an error
|
||||
pub fn inspect_mutable(&self) -> &B {
|
||||
self.mutable()
|
||||
}
|
||||
|
||||
/// Gets a reference to the maybe flushed read-only buffer.
|
||||
/// Returns `None` if the writer has not submitted any flush request.
|
||||
pub fn inspect_maybe_flushed(&self) -> Option<&FullSlice<Buf>> {
|
||||
self.flush_handle.maybe_flushed.as_ref()
|
||||
pub fn inspect_buffer(&self) -> &B {
|
||||
self.buf()
|
||||
}
|
||||
|
||||
#[cfg_attr(target_os = "macos", allow(dead_code))]
|
||||
pub async fn flush_and_into_inner(
|
||||
mut self,
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<(u64, Arc<W>)> {
|
||||
pub async fn flush_and_into_inner(mut self, ctx: &RequestContext) -> std::io::Result<W> {
|
||||
self.flush(ctx).await?;
|
||||
|
||||
let Self {
|
||||
mutable: buf,
|
||||
writer,
|
||||
mut flush_handle,
|
||||
bytes_submitted: bytes_amount,
|
||||
} = self;
|
||||
flush_handle.shutdown().await?;
|
||||
let Self { buf, writer } = self;
|
||||
assert!(buf.is_some());
|
||||
Ok((bytes_amount, writer))
|
||||
Ok(writer)
|
||||
}
|
||||
|
||||
/// Gets a reference to the mutable in-memory buffer.
|
||||
#[inline(always)]
|
||||
fn mutable(&self) -> &B {
|
||||
self.mutable
|
||||
fn buf(&self) -> &B {
|
||||
self.buf
|
||||
.as_ref()
|
||||
.expect("must not use after we returned an error")
|
||||
}
|
||||
|
||||
pub async fn write_buffered_borrowed(
|
||||
/// Guarantees that if Ok() is returned, all bytes in `chunk` have been accepted.
|
||||
#[cfg_attr(target_os = "macos", allow(dead_code))]
|
||||
pub async fn write_buffered<S: IoBuf + Send>(
|
||||
&mut self,
|
||||
chunk: &[u8],
|
||||
chunk: FullSlice<S>,
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<usize> {
|
||||
let (len, control) = self.write_buffered_borrowed_controlled(chunk, ctx).await?;
|
||||
if let Some(control) = control {
|
||||
control.release().await;
|
||||
) -> std::io::Result<(usize, FullSlice<S>)> {
|
||||
let chunk = chunk.into_raw_slice();
|
||||
|
||||
let chunk_len = chunk.len();
|
||||
// avoid memcpy for the middle of the chunk
|
||||
if chunk.len() >= self.buf().cap() {
|
||||
self.flush(ctx).await?;
|
||||
// do a big write, bypassing `buf`
|
||||
assert_eq!(
|
||||
self.buf
|
||||
.as_ref()
|
||||
.expect("must not use after an error")
|
||||
.pending(),
|
||||
0
|
||||
);
|
||||
let (nwritten, chunk) = self
|
||||
.writer
|
||||
.write_all(FullSlice::must_new(chunk), ctx)
|
||||
.await?;
|
||||
assert_eq!(nwritten, chunk_len);
|
||||
return Ok((nwritten, chunk));
|
||||
}
|
||||
Ok(len)
|
||||
// in-memory copy the < BUFFER_SIZED tail of the chunk
|
||||
assert!(chunk.len() < self.buf().cap());
|
||||
let mut slice = &chunk[..];
|
||||
while !slice.is_empty() {
|
||||
let buf = self.buf.as_mut().expect("must not use after an error");
|
||||
let need = buf.cap() - buf.pending();
|
||||
let have = slice.len();
|
||||
let n = std::cmp::min(need, have);
|
||||
buf.extend_from_slice(&slice[..n]);
|
||||
slice = &slice[n..];
|
||||
if buf.pending() >= buf.cap() {
|
||||
assert_eq!(buf.pending(), buf.cap());
|
||||
self.flush(ctx).await?;
|
||||
}
|
||||
}
|
||||
assert!(slice.is_empty(), "by now we should have drained the chunk");
|
||||
Ok((chunk_len, FullSlice::must_new(chunk)))
|
||||
}
|
||||
|
||||
/// In addition to bytes submitted in this write, also returns a handle that can control the flush behavior.
|
||||
pub(crate) async fn write_buffered_borrowed_controlled(
|
||||
/// Strictly less performant variant of [`Self::write_buffered`] that allows writing borrowed data.
|
||||
///
|
||||
/// It is less performant because we always have to copy the borrowed data into the internal buffer
|
||||
/// before we can do the IO. The [`Self::write_buffered`] can avoid this, which is more performant
|
||||
/// for large writes.
|
||||
pub async fn write_buffered_borrowed(
|
||||
&mut self,
|
||||
mut chunk: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<(usize, Option<FlushControl>)> {
|
||||
) -> std::io::Result<usize> {
|
||||
let chunk_len = chunk.len();
|
||||
let mut control: Option<FlushControl> = None;
|
||||
while !chunk.is_empty() {
|
||||
let buf = self.mutable.as_mut().expect("must not use after an error");
|
||||
let buf = self.buf.as_mut().expect("must not use after an error");
|
||||
let need = buf.cap() - buf.pending();
|
||||
let have = chunk.len();
|
||||
let n = std::cmp::min(need, have);
|
||||
@@ -162,27 +145,26 @@ where
|
||||
chunk = &chunk[n..];
|
||||
if buf.pending() >= buf.cap() {
|
||||
assert_eq!(buf.pending(), buf.cap());
|
||||
if let Some(control) = control.take() {
|
||||
control.release().await;
|
||||
}
|
||||
control = self.flush(ctx).await?;
|
||||
self.flush(ctx).await?;
|
||||
}
|
||||
}
|
||||
Ok((chunk_len, control))
|
||||
Ok(chunk_len)
|
||||
}
|
||||
|
||||
#[must_use = "caller must explcitly check the flush control"]
|
||||
async fn flush(&mut self, _ctx: &RequestContext) -> std::io::Result<Option<FlushControl>> {
|
||||
let buf = self.mutable.take().expect("must not use after an error");
|
||||
async fn flush(&mut self, ctx: &RequestContext) -> std::io::Result<()> {
|
||||
let buf = self.buf.take().expect("must not use after an error");
|
||||
let buf_len = buf.pending();
|
||||
if buf_len == 0 {
|
||||
self.mutable = Some(buf);
|
||||
return Ok(None);
|
||||
self.buf = Some(buf);
|
||||
return Ok(());
|
||||
}
|
||||
let (recycled, flush_control) = self.flush_handle.flush(buf, self.bytes_submitted).await?;
|
||||
self.bytes_submitted += u64::try_from(buf_len).unwrap();
|
||||
self.mutable = Some(recycled);
|
||||
Ok(Some(flush_control))
|
||||
let slice = buf.flush();
|
||||
let (nwritten, slice) = self.writer.write_all(slice, ctx).await?;
|
||||
assert_eq!(nwritten, buf_len);
|
||||
self.buf = Some(Buffer::reuse_after_flush(
|
||||
slice.into_raw_slice().into_inner(),
|
||||
));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -237,73 +219,37 @@ impl Buffer for BytesMut {
|
||||
}
|
||||
}
|
||||
|
||||
impl Buffer for IoBufferMut {
|
||||
type IoBuf = IoBuffer;
|
||||
|
||||
fn cap(&self) -> usize {
|
||||
self.capacity()
|
||||
}
|
||||
|
||||
fn extend_from_slice(&mut self, other: &[u8]) {
|
||||
IoBufferMut::extend_from_slice(self, other);
|
||||
}
|
||||
|
||||
fn pending(&self) -> usize {
|
||||
self.len()
|
||||
}
|
||||
|
||||
fn flush(self) -> FullSlice<Self::IoBuf> {
|
||||
self.freeze().slice_len()
|
||||
}
|
||||
|
||||
/// Caller should make sure that `iobuf` only have one strong reference before invoking this method.
|
||||
fn reuse_after_flush(iobuf: Self::IoBuf) -> Self {
|
||||
let mut recycled = iobuf
|
||||
.into_mut()
|
||||
.expect("buffer should only have one strong reference");
|
||||
recycled.clear();
|
||||
recycled
|
||||
impl OwnedAsyncWriter for Vec<u8> {
|
||||
async fn write_all<Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
buf: FullSlice<Buf>,
|
||||
_: &RequestContext,
|
||||
) -> std::io::Result<(usize, FullSlice<Buf>)> {
|
||||
self.extend_from_slice(&buf[..]);
|
||||
Ok((buf.len(), buf))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Mutex;
|
||||
use bytes::BytesMut;
|
||||
|
||||
use super::*;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::task_mgr::TaskKind;
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
#[derive(Default)]
|
||||
struct RecorderWriter {
|
||||
/// record bytes and write offsets.
|
||||
writes: Mutex<Vec<(Vec<u8>, u64)>>,
|
||||
writes: Vec<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl RecorderWriter {
|
||||
/// Gets recorded bytes and write offsets.
|
||||
fn get_writes(&self) -> Vec<Vec<u8>> {
|
||||
self.writes
|
||||
.lock()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|(buf, _)| buf.clone())
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
impl OwnedAsyncWriter for RecorderWriter {
|
||||
async fn write_all_at<Buf: IoBufAligned + Send>(
|
||||
&self,
|
||||
async fn write_all<Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
buf: FullSlice<Buf>,
|
||||
offset: u64,
|
||||
_: &RequestContext,
|
||||
) -> std::io::Result<FullSlice<Buf>> {
|
||||
self.writes
|
||||
.lock()
|
||||
.unwrap()
|
||||
.push((Vec::from(&buf[..]), offset));
|
||||
Ok(buf)
|
||||
) -> std::io::Result<(usize, FullSlice<Buf>)> {
|
||||
self.writes.push(Vec::from(&buf[..]));
|
||||
Ok((buf.len(), buf))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -311,21 +257,71 @@ mod tests {
|
||||
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
|
||||
}
|
||||
|
||||
macro_rules! write {
|
||||
($writer:ident, $data:literal) => {{
|
||||
$writer
|
||||
.write_buffered(::bytes::Bytes::from_static($data).slice_len(), &test_ctx())
|
||||
.await?;
|
||||
}};
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write_all_borrowed_always_goes_through_buffer() -> anyhow::Result<()> {
|
||||
async fn test_buffered_writes_only() -> std::io::Result<()> {
|
||||
let recorder = RecorderWriter::default();
|
||||
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
|
||||
write!(writer, b"a");
|
||||
write!(writer, b"b");
|
||||
write!(writer, b"c");
|
||||
write!(writer, b"d");
|
||||
write!(writer, b"e");
|
||||
let recorder = writer.flush_and_into_inner(&test_ctx()).await?;
|
||||
assert_eq!(
|
||||
recorder.writes,
|
||||
vec![Vec::from(b"ab"), Vec::from(b"cd"), Vec::from(b"e")]
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_passthrough_writes_only() -> std::io::Result<()> {
|
||||
let recorder = RecorderWriter::default();
|
||||
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
|
||||
write!(writer, b"abc");
|
||||
write!(writer, b"de");
|
||||
write!(writer, b"");
|
||||
write!(writer, b"fghijk");
|
||||
let recorder = writer.flush_and_into_inner(&test_ctx()).await?;
|
||||
assert_eq!(
|
||||
recorder.writes,
|
||||
vec![Vec::from(b"abc"), Vec::from(b"de"), Vec::from(b"fghijk")]
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_passthrough_write_with_nonempty_buffer() -> std::io::Result<()> {
|
||||
let recorder = RecorderWriter::default();
|
||||
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
|
||||
write!(writer, b"a");
|
||||
write!(writer, b"bc");
|
||||
write!(writer, b"d");
|
||||
write!(writer, b"e");
|
||||
let recorder = writer.flush_and_into_inner(&test_ctx()).await?;
|
||||
assert_eq!(
|
||||
recorder.writes,
|
||||
vec![Vec::from(b"a"), Vec::from(b"bc"), Vec::from(b"de")]
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write_all_borrowed_always_goes_through_buffer() -> std::io::Result<()> {
|
||||
let ctx = test_ctx();
|
||||
let ctx = &ctx;
|
||||
let recorder = Arc::new(RecorderWriter::default());
|
||||
let gate = utils::sync::gate::Gate::default();
|
||||
let mut writer = BufferedWriter::<_, RecorderWriter>::new(
|
||||
recorder,
|
||||
|| IoBufferMut::with_capacity(2),
|
||||
gate.enter()?,
|
||||
ctx,
|
||||
);
|
||||
let recorder = RecorderWriter::default();
|
||||
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
|
||||
|
||||
writer.write_buffered_borrowed(b"abc", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"d", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"e", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"fg", ctx).await?;
|
||||
@@ -333,9 +329,9 @@ mod tests {
|
||||
writer.write_buffered_borrowed(b"j", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"klmno", ctx).await?;
|
||||
|
||||
let (_, recorder) = writer.flush_and_into_inner(ctx).await?;
|
||||
let recorder = writer.flush_and_into_inner(ctx).await?;
|
||||
assert_eq!(
|
||||
recorder.get_writes(),
|
||||
recorder.writes,
|
||||
{
|
||||
let expect: &[&[u8]] = &[b"ab", b"cd", b"ef", b"gh", b"ij", b"kl", b"mn", b"o"];
|
||||
expect
|
||||
|
||||
@@ -1,309 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use utils::sync::duplex;
|
||||
|
||||
use crate::{
|
||||
context::RequestContext,
|
||||
virtual_file::owned_buffers_io::{io_buf_aligned::IoBufAligned, io_buf_ext::FullSlice},
|
||||
};
|
||||
|
||||
use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter};
|
||||
|
||||
/// A handle to the flush task.
|
||||
pub struct FlushHandle<Buf, W> {
|
||||
inner: Option<FlushHandleInner<Buf, W>>,
|
||||
/// Immutable buffer for serving tail reads.
|
||||
/// `None` if no flush request has been submitted.
|
||||
pub(super) maybe_flushed: Option<FullSlice<Buf>>,
|
||||
}
|
||||
|
||||
pub struct FlushHandleInner<Buf, W> {
|
||||
/// A bi-directional channel that sends (buffer, offset) for writes,
|
||||
/// and receives recyled buffer.
|
||||
channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
|
||||
/// Join handle for the background flush task.
|
||||
join_handle: tokio::task::JoinHandle<std::io::Result<Arc<W>>>,
|
||||
}
|
||||
|
||||
struct FlushRequest<Buf> {
|
||||
slice: FullSlice<Buf>,
|
||||
offset: u64,
|
||||
#[cfg(test)]
|
||||
ready_to_flush_rx: tokio::sync::oneshot::Receiver<()>,
|
||||
#[cfg(test)]
|
||||
done_flush_tx: tokio::sync::oneshot::Sender<()>,
|
||||
}
|
||||
|
||||
/// Constructs a request and a control object for a new flush operation.
|
||||
#[cfg(not(test))]
|
||||
fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
|
||||
let request = FlushRequest { slice, offset };
|
||||
let control = FlushControl::untracked();
|
||||
|
||||
(request, control)
|
||||
}
|
||||
|
||||
/// Constructs a request and a control object for a new flush operation.
|
||||
#[cfg(test)]
|
||||
fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
|
||||
let (ready_to_flush_tx, ready_to_flush_rx) = tokio::sync::oneshot::channel();
|
||||
let (done_flush_tx, done_flush_rx) = tokio::sync::oneshot::channel();
|
||||
let control = FlushControl::not_started(ready_to_flush_tx, done_flush_rx);
|
||||
|
||||
let request = FlushRequest {
|
||||
slice,
|
||||
offset,
|
||||
ready_to_flush_rx,
|
||||
done_flush_tx,
|
||||
};
|
||||
(request, control)
|
||||
}
|
||||
|
||||
/// A handle to a `FlushRequest` that allows unit tests precise control over flush behavior.
|
||||
#[cfg(test)]
|
||||
pub(crate) struct FlushControl {
|
||||
not_started: FlushNotStarted,
|
||||
}
|
||||
|
||||
#[cfg(not(test))]
|
||||
pub(crate) struct FlushControl;
|
||||
|
||||
impl FlushControl {
|
||||
#[cfg(test)]
|
||||
fn not_started(
|
||||
ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
|
||||
done_flush_rx: tokio::sync::oneshot::Receiver<()>,
|
||||
) -> Self {
|
||||
FlushControl {
|
||||
not_started: FlushNotStarted {
|
||||
ready_to_flush_tx,
|
||||
done_flush_rx,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(test))]
|
||||
fn untracked() -> Self {
|
||||
FlushControl
|
||||
}
|
||||
|
||||
/// In tests, turn flush control into a not started state.
|
||||
#[cfg(test)]
|
||||
pub(crate) fn into_not_started(self) -> FlushNotStarted {
|
||||
self.not_started
|
||||
}
|
||||
|
||||
/// Release control to the submitted buffer.
|
||||
///
|
||||
/// In `cfg(test)` environment, the buffer is guranteed to be flushed to disk after [`FlushControl::release`] is finishes execution.
|
||||
pub async fn release(self) {
|
||||
#[cfg(test)]
|
||||
{
|
||||
self.not_started
|
||||
.ready_to_flush()
|
||||
.wait_until_flush_is_done()
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Buf, W> FlushHandle<Buf, W>
|
||||
where
|
||||
Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
|
||||
W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
|
||||
{
|
||||
/// Spawns a new background flush task and obtains a handle.
|
||||
///
|
||||
/// Note: The background task so we do not need to explicitly maintain a queue of buffers.
|
||||
pub fn spawn_new<B>(
|
||||
file: Arc<W>,
|
||||
buf: B,
|
||||
gate_guard: utils::sync::gate::GateGuard,
|
||||
ctx: RequestContext,
|
||||
) -> Self
|
||||
where
|
||||
B: Buffer<IoBuf = Buf> + Send + 'static,
|
||||
{
|
||||
let (front, back) = duplex::mpsc::channel(2);
|
||||
|
||||
let join_handle = tokio::spawn(async move {
|
||||
FlushBackgroundTask::new(back, file, gate_guard, ctx)
|
||||
.run(buf.flush())
|
||||
.await
|
||||
});
|
||||
|
||||
FlushHandle {
|
||||
inner: Some(FlushHandleInner {
|
||||
channel: front,
|
||||
join_handle,
|
||||
}),
|
||||
maybe_flushed: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Submits a buffer to be flushed in the background task.
|
||||
/// Returns a buffer that completed flushing for re-use, length reset to 0, capacity unchanged.
|
||||
/// If `save_buf_for_read` is true, then we save the buffer in `Self::maybe_flushed`, otherwise
|
||||
/// clear `maybe_flushed`.
|
||||
pub async fn flush<B>(&mut self, buf: B, offset: u64) -> std::io::Result<(B, FlushControl)>
|
||||
where
|
||||
B: Buffer<IoBuf = Buf> + Send + 'static,
|
||||
{
|
||||
let slice = buf.flush();
|
||||
|
||||
// Saves a buffer for read while flushing. This also removes reference to the old buffer.
|
||||
self.maybe_flushed = Some(slice.cheap_clone());
|
||||
|
||||
let (request, flush_control) = new_flush_op(slice, offset);
|
||||
|
||||
// Submits the buffer to the background task.
|
||||
let submit = self.inner_mut().channel.send(request).await;
|
||||
if submit.is_err() {
|
||||
return self.handle_error().await;
|
||||
}
|
||||
|
||||
// Wait for an available buffer from the background flush task.
|
||||
let Some(recycled) = self.inner_mut().channel.recv().await else {
|
||||
return self.handle_error().await;
|
||||
};
|
||||
|
||||
// The only other place that could hold a reference to the recycled buffer
|
||||
// is in `Self::maybe_flushed`, but we have already replace it with the new buffer.
|
||||
|
||||
let recycled = Buffer::reuse_after_flush(recycled.into_raw_slice().into_inner());
|
||||
Ok((recycled, flush_control))
|
||||
}
|
||||
|
||||
/// Cleans up the channel, join the flush task.
|
||||
pub async fn shutdown(&mut self) -> std::io::Result<Arc<W>> {
|
||||
let handle = self
|
||||
.inner
|
||||
.take()
|
||||
.expect("must not use after we returned an error");
|
||||
drop(handle.channel.tx);
|
||||
handle.join_handle.await.unwrap()
|
||||
}
|
||||
|
||||
/// Gets a mutable reference to the inner handle. Panics if [`Self::inner`] is `None`.
|
||||
/// This only happens if the handle is used after an error.
|
||||
fn inner_mut(&mut self) -> &mut FlushHandleInner<Buf, W> {
|
||||
self.inner
|
||||
.as_mut()
|
||||
.expect("must not use after we returned an error")
|
||||
}
|
||||
|
||||
async fn handle_error<T>(&mut self) -> std::io::Result<T> {
|
||||
Err(self.shutdown().await.unwrap_err())
|
||||
}
|
||||
}
|
||||
|
||||
/// A background task for flushing data to disk.
|
||||
pub struct FlushBackgroundTask<Buf, W> {
|
||||
/// A bi-directional channel that receives (buffer, offset) for writes,
|
||||
/// and send back recycled buffer.
|
||||
channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
|
||||
/// A writter for persisting data to disk.
|
||||
writer: Arc<W>,
|
||||
ctx: RequestContext,
|
||||
/// Prevent timeline from shuting down until the flush background task finishes flushing all remaining buffers to disk.
|
||||
_gate_guard: utils::sync::gate::GateGuard,
|
||||
}
|
||||
|
||||
impl<Buf, W> FlushBackgroundTask<Buf, W>
|
||||
where
|
||||
Buf: IoBufAligned + Send + Sync,
|
||||
W: OwnedAsyncWriter + Sync + 'static,
|
||||
{
|
||||
/// Creates a new background flush task.
|
||||
fn new(
|
||||
channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
|
||||
file: Arc<W>,
|
||||
gate_guard: utils::sync::gate::GateGuard,
|
||||
ctx: RequestContext,
|
||||
) -> Self {
|
||||
FlushBackgroundTask {
|
||||
channel,
|
||||
writer: file,
|
||||
_gate_guard: gate_guard,
|
||||
ctx,
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs the background flush task.
|
||||
/// The passed in slice is immediately sent back to the flush handle through the duplex channel.
|
||||
async fn run(mut self, slice: FullSlice<Buf>) -> std::io::Result<Arc<W>> {
|
||||
// Sends the extra buffer back to the handle.
|
||||
self.channel.send(slice).await.map_err(|_| {
|
||||
std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early")
|
||||
})?;
|
||||
|
||||
// Exit condition: channel is closed and there is no remaining buffer to be flushed
|
||||
while let Some(request) = self.channel.recv().await {
|
||||
#[cfg(test)]
|
||||
{
|
||||
// In test, wait for control to signal that we are ready to flush.
|
||||
if request.ready_to_flush_rx.await.is_err() {
|
||||
tracing::debug!("control dropped");
|
||||
}
|
||||
}
|
||||
|
||||
// Write slice to disk at `offset`.
|
||||
let slice = self
|
||||
.writer
|
||||
.write_all_at(request.slice, request.offset, &self.ctx)
|
||||
.await?;
|
||||
|
||||
#[cfg(test)]
|
||||
{
|
||||
// In test, tell control we are done flushing buffer.
|
||||
if request.done_flush_tx.send(()).is_err() {
|
||||
tracing::debug!("control dropped");
|
||||
}
|
||||
}
|
||||
|
||||
// Sends the buffer back to the handle for reuse. The handle is in charged of cleaning the buffer.
|
||||
if self.channel.send(slice).await.is_err() {
|
||||
// Although channel is closed. Still need to finish flushing the remaining buffers.
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(self.writer)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) struct FlushNotStarted {
|
||||
ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
|
||||
done_flush_rx: tokio::sync::oneshot::Receiver<()>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) struct FlushInProgress {
|
||||
done_flush_rx: tokio::sync::oneshot::Receiver<()>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) struct FlushDone;
|
||||
|
||||
#[cfg(test)]
|
||||
impl FlushNotStarted {
|
||||
/// Signals the background task the buffer is ready to flush to disk.
|
||||
pub fn ready_to_flush(self) -> FlushInProgress {
|
||||
self.ready_to_flush_tx
|
||||
.send(())
|
||||
.map(|_| FlushInProgress {
|
||||
done_flush_rx: self.done_flush_rx,
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl FlushInProgress {
|
||||
/// Waits until background flush is done.
|
||||
pub async fn wait_until_flush_is_done(self) -> FlushDone {
|
||||
self.done_flush_rx.await.unwrap();
|
||||
FlushDone
|
||||
}
|
||||
}
|
||||
@@ -334,14 +334,32 @@ impl WalIngest {
|
||||
// replaying it would fail to find the previous image of the page, because
|
||||
// it doesn't exist. So check if the VM page(s) exist, and skip the WAL
|
||||
// record if it doesn't.
|
||||
let vm_size = get_relsize(modification, vm_rel, ctx).await?;
|
||||
//
|
||||
// TODO: analyze the metrics and tighten this up accordingly. This logic
|
||||
// implicitly assumes that VM pages see explicit WAL writes before
|
||||
// implicit ClearVmBits, and will otherwise silently drop updates.
|
||||
let Some(vm_size) = get_relsize(modification, vm_rel, ctx).await? else {
|
||||
WAL_INGEST
|
||||
.clear_vm_bits_unknown
|
||||
.with_label_values(&["relation"])
|
||||
.inc();
|
||||
return Ok(());
|
||||
};
|
||||
if let Some(blknum) = new_vm_blk {
|
||||
if blknum >= vm_size {
|
||||
WAL_INGEST
|
||||
.clear_vm_bits_unknown
|
||||
.with_label_values(&["new_page"])
|
||||
.inc();
|
||||
new_vm_blk = None;
|
||||
}
|
||||
}
|
||||
if let Some(blknum) = old_vm_blk {
|
||||
if blknum >= vm_size {
|
||||
WAL_INGEST
|
||||
.clear_vm_bits_unknown
|
||||
.with_label_values(&["old_page"])
|
||||
.inc();
|
||||
old_vm_blk = None;
|
||||
}
|
||||
}
|
||||
@@ -572,7 +590,8 @@ impl WalIngest {
|
||||
modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
|
||||
fsm_physical_page_no += 1;
|
||||
}
|
||||
let nblocks = get_relsize(modification, rel, ctx).await?;
|
||||
// TODO: re-examine the None case here wrt. sharding; should we error?
|
||||
let nblocks = get_relsize(modification, rel, ctx).await?.unwrap_or(0);
|
||||
if nblocks > fsm_physical_page_no {
|
||||
// check if something to do: FSM is larger than truncate position
|
||||
self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
|
||||
@@ -612,7 +631,8 @@ impl WalIngest {
|
||||
)?;
|
||||
vm_page_no += 1;
|
||||
}
|
||||
let nblocks = get_relsize(modification, rel, ctx).await?;
|
||||
// TODO: re-examine the None case here wrt. sharding; should we error?
|
||||
let nblocks = get_relsize(modification, rel, ctx).await?.unwrap_or(0);
|
||||
if nblocks > vm_page_no {
|
||||
// check if something to do: VM is larger than truncate position
|
||||
self.put_rel_truncation(modification, rel, vm_page_no, ctx)
|
||||
@@ -1430,24 +1450,27 @@ impl WalIngest {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the size of the relation as of this modification, or None if the relation doesn't exist.
|
||||
///
|
||||
/// This is only accurate on shard 0. On other shards, it will return the size up to the highest
|
||||
/// page number stored in the shard, or None if the shard does not have any pages for it.
|
||||
async fn get_relsize(
|
||||
modification: &DatadirModification<'_>,
|
||||
rel: RelTag,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockNumber, PageReconstructError> {
|
||||
let nblocks = if !modification
|
||||
) -> Result<Option<BlockNumber>, PageReconstructError> {
|
||||
if !modification
|
||||
.tline
|
||||
.get_rel_exists(rel, Version::Modified(modification), ctx)
|
||||
.await?
|
||||
{
|
||||
0
|
||||
} else {
|
||||
modification
|
||||
.tline
|
||||
.get_rel_size(rel, Version::Modified(modification), ctx)
|
||||
.await?
|
||||
};
|
||||
Ok(nblocks)
|
||||
return Ok(None);
|
||||
}
|
||||
modification
|
||||
.tline
|
||||
.get_rel_size(rel, Version::Modified(modification), ctx)
|
||||
.await
|
||||
.map(Some)
|
||||
}
|
||||
|
||||
#[allow(clippy::bool_assert_comparison)]
|
||||
|
||||
@@ -99,16 +99,17 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
|
||||
/// Try to cancel a running query for the corresponding connection.
|
||||
/// If the cancellation key is not found, it will be published to Redis.
|
||||
/// check_allowed - if true, check if the IP is allowed to cancel the query
|
||||
/// return Result primarily for tests
|
||||
pub(crate) async fn cancel_session(
|
||||
&self,
|
||||
key: CancelKeyData,
|
||||
session_id: Uuid,
|
||||
peer_addr: &IpAddr,
|
||||
peer_addr: IpAddr,
|
||||
check_allowed: bool,
|
||||
) -> Result<(), CancelError> {
|
||||
// TODO: check for unspecified address is only for backward compatibility, should be removed
|
||||
if !peer_addr.is_unspecified() {
|
||||
let subnet_key = match *peer_addr {
|
||||
let subnet_key = match peer_addr {
|
||||
IpAddr::V4(ip) => IpNet::V4(Ipv4Net::new_assert(ip, 24).trunc()), // use defaut mask here
|
||||
IpAddr::V6(ip) => IpNet::V6(Ipv6Net::new_assert(ip, 64).trunc()),
|
||||
};
|
||||
@@ -141,9 +142,11 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
match self.client.try_publish(key, session_id, *peer_addr).await {
|
||||
match self.client.try_publish(key, session_id, peer_addr).await {
|
||||
Ok(()) => {} // do nothing
|
||||
Err(e) => {
|
||||
// log it here since cancel_session could be spawned in a task
|
||||
tracing::error!("failed to publish cancellation key: {key}, error: {e}");
|
||||
return Err(CancelError::IO(std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
e.to_string(),
|
||||
@@ -154,8 +157,10 @@ impl<P: CancellationPublisher> CancellationHandler<P> {
|
||||
};
|
||||
|
||||
if check_allowed
|
||||
&& !check_peer_addr_is_in_list(peer_addr, cancel_closure.ip_allowlist.as_slice())
|
||||
&& !check_peer_addr_is_in_list(&peer_addr, cancel_closure.ip_allowlist.as_slice())
|
||||
{
|
||||
// log it here since cancel_session could be spawned in a task
|
||||
tracing::warn!("IP is not allowed to cancel the query: {key}");
|
||||
return Err(CancelError::IpNotAllowed);
|
||||
}
|
||||
|
||||
@@ -306,7 +311,7 @@ mod tests {
|
||||
cancel_key: 0,
|
||||
},
|
||||
Uuid::new_v4(),
|
||||
&("127.0.0.1".parse().unwrap()),
|
||||
"127.0.0.1".parse().unwrap(),
|
||||
true,
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -35,6 +35,7 @@ pub async fn task_main(
|
||||
socket2::SockRef::from(&listener).set_keepalive(true)?;
|
||||
|
||||
let connections = tokio_util::task::task_tracker::TaskTracker::new();
|
||||
let cancellations = tokio_util::task::task_tracker::TaskTracker::new();
|
||||
|
||||
while let Some(accept_result) =
|
||||
run_until_cancelled(listener.accept(), &cancellation_token).await
|
||||
@@ -48,6 +49,7 @@ pub async fn task_main(
|
||||
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
let cancellation_handler = Arc::clone(&cancellation_handler);
|
||||
let cancellations = cancellations.clone();
|
||||
|
||||
debug!(protocol = "tcp", %session_id, "accepted new TCP connection");
|
||||
|
||||
@@ -96,6 +98,7 @@ pub async fn task_main(
|
||||
cancellation_handler,
|
||||
socket,
|
||||
conn_gauge,
|
||||
cancellations,
|
||||
)
|
||||
.instrument(ctx.span())
|
||||
.boxed()
|
||||
@@ -127,10 +130,12 @@ pub async fn task_main(
|
||||
}
|
||||
|
||||
connections.close();
|
||||
cancellations.close();
|
||||
drop(listener);
|
||||
|
||||
// Drain connections
|
||||
connections.wait().await;
|
||||
cancellations.wait().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -142,6 +147,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
cancellation_handler: Arc<CancellationHandlerMain>,
|
||||
stream: S,
|
||||
conn_gauge: NumClientConnectionsGuard<'static>,
|
||||
cancellations: tokio_util::task::task_tracker::TaskTracker,
|
||||
) -> Result<Option<ProxyPassthrough<CancellationHandlerMainInternal, S>>, ClientRequestError> {
|
||||
debug!(
|
||||
protocol = %ctx.protocol(),
|
||||
@@ -161,15 +167,26 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? {
|
||||
HandshakeData::Startup(stream, params) => (stream, params),
|
||||
HandshakeData::Cancel(cancel_key_data) => {
|
||||
return Ok(cancellation_handler
|
||||
.cancel_session(
|
||||
cancel_key_data,
|
||||
ctx.session_id(),
|
||||
&ctx.peer_addr(),
|
||||
config.authentication_config.ip_allowlist_check_enabled,
|
||||
)
|
||||
.await
|
||||
.map(|()| None)?)
|
||||
// spawn a task to cancel the session, but don't wait for it
|
||||
cancellations.spawn({
|
||||
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
|
||||
let session_id = ctx.session_id();
|
||||
let peer_ip = ctx.peer_addr();
|
||||
async move {
|
||||
drop(
|
||||
cancellation_handler_clone
|
||||
.cancel_session(
|
||||
cancel_key_data,
|
||||
session_id,
|
||||
peer_ip,
|
||||
config.authentication_config.ip_allowlist_check_enabled,
|
||||
)
|
||||
.await,
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
drop(pause);
|
||||
|
||||
@@ -8,7 +8,7 @@ use pq_proto::StartupMessageParams;
|
||||
use smol_str::SmolStr;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::field::display;
|
||||
use tracing::{debug, info_span, Span};
|
||||
use tracing::{debug, error, info_span, Span};
|
||||
use try_lock::TryLock;
|
||||
use uuid::Uuid;
|
||||
|
||||
@@ -415,9 +415,11 @@ impl RequestContextInner {
|
||||
});
|
||||
}
|
||||
if let Some(tx) = self.sender.take() {
|
||||
tx.send(RequestData::from(&*self))
|
||||
.inspect_err(|e| debug!("tx send failed: {e}"))
|
||||
.ok();
|
||||
// If type changes, this error handling needs to be updated.
|
||||
let tx: mpsc::UnboundedSender<RequestData> = tx;
|
||||
if let Err(e) = tx.send(RequestData::from(&*self)) {
|
||||
error!("log_connect channel send failed: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -426,9 +428,11 @@ impl RequestContextInner {
|
||||
// Here we log the length of the session.
|
||||
self.disconnect_timestamp = Some(Utc::now());
|
||||
if let Some(tx) = self.disconnect_sender.take() {
|
||||
tx.send(RequestData::from(&*self))
|
||||
.inspect_err(|e| debug!("tx send failed: {e}"))
|
||||
.ok();
|
||||
// If type changes, this error handling needs to be updated.
|
||||
let tx: mpsc::UnboundedSender<RequestData> = tx;
|
||||
if let Err(e) = tx.send(RequestData::from(&*self)) {
|
||||
error!("log_disconnect channel send failed: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -398,7 +398,7 @@ async fn upload_parquet(
|
||||
.err();
|
||||
|
||||
if let Some(err) = maybe_err {
|
||||
tracing::warn!(%id, %err, "failed to upload request data");
|
||||
tracing::error!(%id, error = ?err, "failed to upload request data");
|
||||
}
|
||||
|
||||
Ok(buffer.writer())
|
||||
|
||||
@@ -69,6 +69,7 @@ pub async fn task_main(
|
||||
socket2::SockRef::from(&listener).set_keepalive(true)?;
|
||||
|
||||
let connections = tokio_util::task::task_tracker::TaskTracker::new();
|
||||
let cancellations = tokio_util::task::task_tracker::TaskTracker::new();
|
||||
|
||||
while let Some(accept_result) =
|
||||
run_until_cancelled(listener.accept(), &cancellation_token).await
|
||||
@@ -82,6 +83,7 @@ pub async fn task_main(
|
||||
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
let cancellation_handler = Arc::clone(&cancellation_handler);
|
||||
let cancellations = cancellations.clone();
|
||||
|
||||
debug!(protocol = "tcp", %session_id, "accepted new TCP connection");
|
||||
let endpoint_rate_limiter2 = endpoint_rate_limiter.clone();
|
||||
@@ -133,6 +135,7 @@ pub async fn task_main(
|
||||
ClientMode::Tcp,
|
||||
endpoint_rate_limiter2,
|
||||
conn_gauge,
|
||||
cancellations,
|
||||
)
|
||||
.instrument(ctx.span())
|
||||
.boxed()
|
||||
@@ -164,10 +167,12 @@ pub async fn task_main(
|
||||
}
|
||||
|
||||
connections.close();
|
||||
cancellations.close();
|
||||
drop(listener);
|
||||
|
||||
// Drain connections
|
||||
connections.wait().await;
|
||||
cancellations.wait().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -250,6 +255,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
mode: ClientMode,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
conn_gauge: NumClientConnectionsGuard<'static>,
|
||||
cancellations: tokio_util::task::task_tracker::TaskTracker,
|
||||
) -> Result<Option<ProxyPassthrough<CancellationHandlerMainInternal, S>>, ClientRequestError> {
|
||||
debug!(
|
||||
protocol = %ctx.protocol(),
|
||||
@@ -270,15 +276,26 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? {
|
||||
HandshakeData::Startup(stream, params) => (stream, params),
|
||||
HandshakeData::Cancel(cancel_key_data) => {
|
||||
return Ok(cancellation_handler
|
||||
.cancel_session(
|
||||
cancel_key_data,
|
||||
ctx.session_id(),
|
||||
&ctx.peer_addr(),
|
||||
config.authentication_config.ip_allowlist_check_enabled,
|
||||
)
|
||||
.await
|
||||
.map(|()| None)?)
|
||||
// spawn a task to cancel the session, but don't wait for it
|
||||
cancellations.spawn({
|
||||
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
|
||||
let session_id = ctx.session_id();
|
||||
let peer_ip = ctx.peer_addr();
|
||||
async move {
|
||||
drop(
|
||||
cancellation_handler_clone
|
||||
.cancel_session(
|
||||
cancel_key_data,
|
||||
session_id,
|
||||
peer_ip,
|
||||
config.authentication_config.ip_allowlist_check_enabled,
|
||||
)
|
||||
.await,
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
drop(pause);
|
||||
|
||||
@@ -149,7 +149,7 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
|
||||
.cancel_session(
|
||||
cancel_session.cancel_key_data,
|
||||
uuid::Uuid::nil(),
|
||||
&peer_addr,
|
||||
peer_addr,
|
||||
cancel_session.peer_addr.is_some(),
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -132,6 +132,7 @@ pub async fn task_main(
|
||||
let connections = tokio_util::task::task_tracker::TaskTracker::new();
|
||||
connections.close(); // allows `connections.wait to complete`
|
||||
|
||||
let cancellations = tokio_util::task::task_tracker::TaskTracker::new();
|
||||
while let Some(res) = run_until_cancelled(ws_listener.accept(), &cancellation_token).await {
|
||||
let (conn, peer_addr) = res.context("could not accept TCP stream")?;
|
||||
if let Err(e) = conn.set_nodelay(true) {
|
||||
@@ -160,6 +161,7 @@ pub async fn task_main(
|
||||
let connections2 = connections.clone();
|
||||
let cancellation_handler = cancellation_handler.clone();
|
||||
let endpoint_rate_limiter = endpoint_rate_limiter.clone();
|
||||
let cancellations = cancellations.clone();
|
||||
connections.spawn(
|
||||
async move {
|
||||
let conn_token2 = conn_token.clone();
|
||||
@@ -188,6 +190,7 @@ pub async fn task_main(
|
||||
config,
|
||||
backend,
|
||||
connections2,
|
||||
cancellations,
|
||||
cancellation_handler,
|
||||
endpoint_rate_limiter,
|
||||
conn_token,
|
||||
@@ -313,6 +316,7 @@ async fn connection_handler(
|
||||
config: &'static ProxyConfig,
|
||||
backend: Arc<PoolingBackend>,
|
||||
connections: TaskTracker,
|
||||
cancellations: TaskTracker,
|
||||
cancellation_handler: Arc<CancellationHandlerMain>,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
cancellation_token: CancellationToken,
|
||||
@@ -353,6 +357,7 @@ async fn connection_handler(
|
||||
|
||||
// `request_handler` is not cancel safe. It expects to be cancelled only at specific times.
|
||||
// By spawning the future, we ensure it never gets cancelled until it decides to.
|
||||
let cancellations = cancellations.clone();
|
||||
let handler = connections.spawn(
|
||||
request_handler(
|
||||
req,
|
||||
@@ -364,6 +369,7 @@ async fn connection_handler(
|
||||
conn_info2.clone(),
|
||||
http_request_token,
|
||||
endpoint_rate_limiter.clone(),
|
||||
cancellations,
|
||||
)
|
||||
.in_current_span()
|
||||
.map_ok_or_else(api_error_into_response, |r| r),
|
||||
@@ -411,6 +417,7 @@ async fn request_handler(
|
||||
// used to cancel in-flight HTTP requests. not used to cancel websockets
|
||||
http_cancellation_token: CancellationToken,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
cancellations: TaskTracker,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ApiError> {
|
||||
let host = request
|
||||
.headers()
|
||||
@@ -436,6 +443,7 @@ async fn request_handler(
|
||||
let (response, websocket) = framed_websockets::upgrade::upgrade(&mut request)
|
||||
.map_err(|e| ApiError::BadRequest(e.into()))?;
|
||||
|
||||
let cancellations = cancellations.clone();
|
||||
ws_connections.spawn(
|
||||
async move {
|
||||
if let Err(e) = websocket::serve_websocket(
|
||||
@@ -446,6 +454,7 @@ async fn request_handler(
|
||||
cancellation_handler,
|
||||
endpoint_rate_limiter,
|
||||
host,
|
||||
cancellations,
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
||||
@@ -123,6 +123,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncBufRead for WebSocketRw<S> {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) async fn serve_websocket(
|
||||
config: &'static ProxyConfig,
|
||||
auth_backend: &'static crate::auth::Backend<'static, ()>,
|
||||
@@ -131,6 +132,7 @@ pub(crate) async fn serve_websocket(
|
||||
cancellation_handler: Arc<CancellationHandlerMain>,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
hostname: Option<String>,
|
||||
cancellations: tokio_util::task::task_tracker::TaskTracker,
|
||||
) -> anyhow::Result<()> {
|
||||
let websocket = websocket.await?;
|
||||
let websocket = WebSocketServer::after_handshake(TokioIo::new(websocket));
|
||||
@@ -149,6 +151,7 @@ pub(crate) async fn serve_websocket(
|
||||
ClientMode::Websockets { hostname },
|
||||
endpoint_rate_limiter,
|
||||
conn_gauge,
|
||||
cancellations,
|
||||
))
|
||||
.await;
|
||||
|
||||
|
||||
@@ -128,7 +128,7 @@ pub(crate) async fn branch_cleanup_and_check_errors(
|
||||
|
||||
let layer_names = index_part.layer_metadata.keys().cloned().collect_vec();
|
||||
if let Some(err) = check_valid_layermap(&layer_names) {
|
||||
result.errors.push(format!(
|
||||
result.warnings.push(format!(
|
||||
"index_part.json contains invalid layer map structure: {err}"
|
||||
));
|
||||
}
|
||||
|
||||
@@ -469,6 +469,7 @@ class NeonEnvBuilder:
|
||||
default_remote_storage_if_missing: bool = True,
|
||||
initial_tenant_shard_count: int | None = None,
|
||||
initial_tenant_shard_stripe_size: int | None = None,
|
||||
timeout_in_seconds: int | None = None,
|
||||
) -> NeonEnv:
|
||||
"""
|
||||
Default way to create and start NeonEnv. Also creates the initial_tenant with root initial_timeline.
|
||||
@@ -478,7 +479,13 @@ class NeonEnvBuilder:
|
||||
Configuring pageserver with remote storage is now the default. There will be a warning if pageserver is created without one.
|
||||
"""
|
||||
env = self.init_configs(default_remote_storage_if_missing=default_remote_storage_if_missing)
|
||||
env.start()
|
||||
if timeout_in_seconds is None:
|
||||
if os.getenv("BUILD_TYPE") == "release":
|
||||
timeout_in_seconds = 15
|
||||
elif os.getenv("BUILD_TYPE") == "debug":
|
||||
timeout_in_seconds = 30
|
||||
|
||||
env.start(timeout_in_seconds=timeout_in_seconds)
|
||||
|
||||
# Prepare the default branch to start the postgres on later.
|
||||
# Pageserver itself does not create tenants and timelines, until started first and asked via HTTP API.
|
||||
|
||||
@@ -1,114 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import random
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
from fixtures.common_types import Lsn
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
|
||||
@pytest.mark.timeout(600)
|
||||
@pytest.mark.parametrize("size", [8, 64, 1024, 8192])
|
||||
@pytest.mark.parametrize("backpressure", [True, False])
|
||||
@pytest.mark.parametrize("fsync", [True, False])
|
||||
def test_ingest_insert_bulk(
|
||||
request: pytest.FixtureRequest,
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
zenbenchmark: NeonBenchmarker,
|
||||
fsync: bool,
|
||||
backpressure: bool,
|
||||
size: int,
|
||||
):
|
||||
"""
|
||||
Benchmarks ingestion of 8 GB of sequential insert WAL with concurrent inserts.
|
||||
"""
|
||||
|
||||
CONCURRENCY = 1 # 1 is optimal without fsync or backpressure
|
||||
VOLUME = 8 * 1024**3
|
||||
rows = VOLUME // (size + 64) # +64 roughly accounts for per-row WAL overhead
|
||||
|
||||
# Change Direct IO modes
|
||||
neon_env_builder.pageserver_virtual_file_io_mode = "direct"
|
||||
neon_env_builder.safekeepers_enable_fsync = fsync
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# NB: neon_local defaults to max_replication_write_lag=15MB, which is too low.
|
||||
# Production uses 500MB.
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
f"fsync = {fsync}",
|
||||
"max_replication_apply_lag = 0",
|
||||
f"max_replication_flush_lag = {'10GB' if backpressure else '0'}",
|
||||
f"max_replication_write_lag = {'500MB' if backpressure else '0'}",
|
||||
],
|
||||
)
|
||||
endpoint.safe_psql("create extension neon")
|
||||
|
||||
# Wait for the timeline to be propagated to the pageserver.
|
||||
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
# Ingest rows.
|
||||
log.info("Ingesting data")
|
||||
start_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
|
||||
|
||||
def insert_rows(endpoint, table, count, value):
|
||||
with endpoint.connect().cursor() as cur:
|
||||
cur.execute("set statement_timeout = 0")
|
||||
cur.execute(f"create table {table} (id int, data bytea)")
|
||||
cur.execute(f"insert into {table} values (generate_series(1, {count}), %s)", (value,))
|
||||
|
||||
with zenbenchmark.record_duration("ingest"):
|
||||
with ThreadPoolExecutor(max_workers=CONCURRENCY) as pool:
|
||||
for i in range(CONCURRENCY):
|
||||
# Write a random value for all rows. This is sufficient to prevent compression, e.g.
|
||||
# in TOAST. Randomly generating every row is too slow.
|
||||
value = random.randbytes(size)
|
||||
worker_rows = rows / CONCURRENCY
|
||||
pool.submit(insert_rows, endpoint, f"table{i}", worker_rows, value)
|
||||
|
||||
end_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
|
||||
client = env.pageserver.http_client()
|
||||
wait_for_last_record_lsn(client, env.initial_tenant, env.initial_timeline, end_lsn)
|
||||
|
||||
backpressure_time = endpoint.safe_psql("select backpressure_throttling_time()")[0][0]
|
||||
|
||||
# Now that all data is ingested, delete and recreate the tenant in the pageserver. This will
|
||||
# reingest all the WAL directly from the safekeeper. This gives us a baseline of how fast the
|
||||
# pageserver can ingest this WAL in isolation.
|
||||
pg_version = PgVersion(
|
||||
str(client.timeline_detail(env.initial_tenant, env.initial_timeline)["pg_version"])
|
||||
)
|
||||
status = env.storage_controller.inspect(tenant_shard_id=env.initial_tenant)
|
||||
assert status is not None
|
||||
|
||||
endpoint.stop() # avoid spurious getpage errors
|
||||
client.tenant_delete(env.initial_tenant)
|
||||
env.pageserver.tenant_create(tenant_id=env.initial_tenant, generation=status[0])
|
||||
|
||||
with zenbenchmark.record_duration("recover"):
|
||||
log.info("Recovering WAL into pageserver")
|
||||
client.timeline_create(pg_version, env.initial_tenant, env.initial_timeline)
|
||||
wait_for_last_record_lsn(client, env.initial_tenant, env.initial_timeline, end_lsn)
|
||||
|
||||
# Emit metrics.
|
||||
wal_written_mb = round((end_lsn - start_lsn) / (1024 * 1024))
|
||||
zenbenchmark.record("wal_written", wal_written_mb, "MB", MetricReport.TEST_PARAM)
|
||||
zenbenchmark.record("row_count", rows, "rows", MetricReport.TEST_PARAM)
|
||||
zenbenchmark.record("concurrency", CONCURRENCY, "clients", MetricReport.TEST_PARAM)
|
||||
zenbenchmark.record(
|
||||
"backpressure_time", backpressure_time // 1000, "ms", MetricReport.LOWER_IS_BETTER
|
||||
)
|
||||
|
||||
props = {p["name"]: p["value"] for _, p in request.node.user_properties}
|
||||
for name in ("ingest", "recover"):
|
||||
throughput = int(wal_written_mb / props[name])
|
||||
zenbenchmark.record(f"{name}_throughput", throughput, "MB/s", MetricReport.HIGHER_IS_BETTER)
|
||||
Reference in New Issue
Block a user