mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-13 11:10:36 +00:00
Compare commits
2 Commits
release-pr
...
jcsp/neonm
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5d404698a8 | ||
|
|
afe3c32b66 |
22
NEONMQ.md
Normal file
22
NEONMQ.md
Normal file
@@ -0,0 +1,22 @@
|
||||
|
||||
# NeonMQ
|
||||
|
||||
## High level design
|
||||
|
||||
In the prototype, the pageserver exposes HTTP endpoints for produce and consume.
|
||||
In a real system, produce would go via the safekeeper. Topics would be 'timelines'
|
||||
from the safekeeper's point of view.
|
||||
|
||||
A Timeline is used for storage. In the prototype this is just the same timeline that
|
||||
stores some real database content too. In a real system, it might be a dedicated timeline.
|
||||
|
||||
|
||||
## Server
|
||||
|
||||
|
||||
|
||||
## Client
|
||||
|
||||
Producers POST their messages.
|
||||
|
||||
Consumers poll, supply a topic and offset, and receive messages ahead of the offset.
|
||||
25
demo.sh
Executable file
25
demo.sh
Executable file
@@ -0,0 +1,25 @@
|
||||
|
||||
cargo neon stop
|
||||
rm -rf .neon
|
||||
cargo neon init
|
||||
cargo neon start
|
||||
|
||||
export TENANT_ID=31ad8e1bfec849f61227b4c09f480b5f
|
||||
export TIMELINE_ID=514099e6b4fad0c0612b3a6b8cf785fb
|
||||
export CHILD_TIMELINE_ID=85321f977b4f9579c5fce1948f484652
|
||||
|
||||
cargo neon tenant create --tenant-id=$TENANT_ID --timeline-id=$TIMELINE_ID
|
||||
|
||||
# Produce some:
|
||||
# ./neonmq.py produce mytopic ohai
|
||||
|
||||
# Show consumption:
|
||||
# ./neonmq.py consume mytopic 1
|
||||
|
||||
# for word in shipping faster with postgres ; do ./neonmq.py produce mytopic $word ; done
|
||||
|
||||
# Make a child
|
||||
# cargo neon timeline branch --tenant-id=31ad8e1bfec849f61227b4c09f480b5f --timeline-id=85321f977b4f9579c5fce1948f484652 --ancestor-branch-name=main --branch-name=child1
|
||||
|
||||
# Produce to child
|
||||
# ./neonmq.py produce mytopic ohai 85321f977b4f9579c5fce1948f484652
|
||||
@@ -47,6 +47,9 @@ pub const AUX_KEY_PREFIX: u8 = 0x62;
|
||||
/// The key prefix of ReplOrigin keys.
|
||||
pub const REPL_ORIGIN_KEY_PREFIX: u8 = 0x63;
|
||||
|
||||
/// The key prefix of Event keys
|
||||
pub const EVENT_KEY_PREFIX: u8 = 0x64;
|
||||
|
||||
/// Check if the key falls in the range of metadata keys.
|
||||
pub const fn is_metadata_key_slice(key: &[u8]) -> bool {
|
||||
key[0] >= METADATA_KEY_BEGIN_PREFIX && key[0] < METADATA_KEY_END_PREFIX
|
||||
@@ -89,6 +92,24 @@ impl Key {
|
||||
}
|
||||
}
|
||||
|
||||
pub const fn metadata_noninherited_key_range() -> Range<Self> {
|
||||
Key {
|
||||
field1: METADATA_KEY_BEGIN_PREFIX,
|
||||
field2: 0,
|
||||
field3: 0,
|
||||
field4: 0,
|
||||
field5: 0,
|
||||
field6: 0,
|
||||
}..Key {
|
||||
field1: EVENT_KEY_PREFIX,
|
||||
field2: 0,
|
||||
field3: 0,
|
||||
field4: 0,
|
||||
field5: 0,
|
||||
field6: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the range of aux keys.
|
||||
pub fn metadata_aux_key_range() -> Range<Self> {
|
||||
Key {
|
||||
@@ -679,7 +700,7 @@ pub fn repl_origin_key_range() -> Range<Key> {
|
||||
/// Non inherited range for vectored get.
|
||||
pub const NON_INHERITED_RANGE: Range<Key> = AUX_FILES_KEY..AUX_FILES_KEY.next();
|
||||
/// Sparse keyspace range for vectored get. Missing key error will be ignored for this range.
|
||||
pub const NON_INHERITED_SPARSE_RANGE: Range<Key> = Key::metadata_key_range();
|
||||
pub const NON_INHERITED_SPARSE_RANGE: Range<Key> = Key::metadata_noninherited_key_range();
|
||||
|
||||
impl Key {
|
||||
// AUX_FILES currently stores only data for logical replication (slots etc), and
|
||||
|
||||
64
neonmq.py
Executable file
64
neonmq.py
Executable file
@@ -0,0 +1,64 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import sys
|
||||
import time
|
||||
|
||||
import requests
|
||||
|
||||
tenants_r = requests.get("http://localhost:9898/v1/tenant")
|
||||
tenants_r.raise_for_status()
|
||||
tenant = tenants_r.json()[0]
|
||||
tenant_id = tenant["id"]
|
||||
|
||||
timelines_r = requests.get(f"http://localhost:9898/v1/tenant/{tenant_id}/timeline")
|
||||
timelines_r.raise_for_status()
|
||||
timeline = timelines_r.json()[0]
|
||||
default_timeline_id = timeline["timeline_id"]
|
||||
|
||||
# print(f"Using MQ timeline {tenant_id}/{timeline_id}")
|
||||
|
||||
if sys.argv[1] == "consume":
|
||||
topic_name = sys.argv[2]
|
||||
offset = int(sys.argv[3])
|
||||
|
||||
try:
|
||||
timeline_id = sys.argv[4]
|
||||
except IndexError:
|
||||
timeline_id = default_timeline_id
|
||||
|
||||
while True:
|
||||
try:
|
||||
response = requests.get(
|
||||
f"http://localhost:9898/v1/tenant/{tenant_id}/timeline/{timeline_id}/event_consume/{topic_name}/{offset}"
|
||||
)
|
||||
if response.status_code == 200:
|
||||
print(response.content)
|
||||
stringized = "".join(map(chr, response.json()["payload"]))
|
||||
print(stringized)
|
||||
offset += 1
|
||||
elif response.status_code == 408:
|
||||
# Proceed to start another long-poll
|
||||
pass
|
||||
else:
|
||||
print(response.content)
|
||||
response.raise_for_status()
|
||||
except Exception as e:
|
||||
print(e)
|
||||
time.sleep(5)
|
||||
|
||||
elif sys.argv[1] == "produce":
|
||||
topic_name = sys.argv[2]
|
||||
data = sys.argv[3]
|
||||
|
||||
try:
|
||||
timeline_id = sys.argv[4]
|
||||
except IndexError:
|
||||
timeline_id = default_timeline_id
|
||||
|
||||
response = requests.post(
|
||||
f"http://localhost:9898/v1/tenant/{tenant_id}/timeline/{timeline_id}/event_produce/{topic_name}",
|
||||
data=data,
|
||||
)
|
||||
if response.status_code != 200:
|
||||
print(response.content)
|
||||
response.raise_for_status()
|
||||
@@ -4,11 +4,15 @@
|
||||
use std::cmp::Reverse;
|
||||
use std::collections::BinaryHeap;
|
||||
use std::collections::HashMap;
|
||||
use std::io::Read;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use bytes::Buf;
|
||||
use bytes::Bytes;
|
||||
use enumset::EnumSet;
|
||||
use futures::StreamExt;
|
||||
use futures::TryFutureExt;
|
||||
@@ -77,6 +81,8 @@ use crate::tenant::storage_layer::LayerAccessStatsReset;
|
||||
use crate::tenant::storage_layer::LayerName;
|
||||
use crate::tenant::timeline::CompactFlags;
|
||||
use crate::tenant::timeline::CompactionError;
|
||||
use crate::tenant::timeline::EventAck;
|
||||
use crate::tenant::timeline::EventOffset;
|
||||
use crate::tenant::timeline::Timeline;
|
||||
use crate::tenant::GetTimelineError;
|
||||
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
|
||||
@@ -2008,6 +2014,80 @@ async fn getpage_at_lsn_handler(
|
||||
.await
|
||||
}
|
||||
|
||||
async fn timeline_event_produce(
|
||||
mut request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
let topic_name: String = parse_request_param(&request, "topic_name")?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
let state = get_state(&request);
|
||||
|
||||
let timeline =
|
||||
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
|
||||
.await?;
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
|
||||
let body = hyper::body::aggregate(request.body_mut())
|
||||
.await
|
||||
.context("Failed to read request body")
|
||||
.map_err(ApiError::BadRequest)?;
|
||||
|
||||
if body.remaining() == 0 {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"missing request body"
|
||||
)));
|
||||
}
|
||||
let mut buffer = Vec::new();
|
||||
body.reader()
|
||||
.read_to_end(&mut buffer)
|
||||
.map_err(|e| ApiError::BadRequest(anyhow::anyhow!(e)))?;
|
||||
let payload = Bytes::from(buffer);
|
||||
|
||||
let offset = timeline
|
||||
.topics
|
||||
.produce(topic_name, &timeline, payload, &ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::OK, EventAck { offset })
|
||||
}
|
||||
|
||||
async fn timeline_event_consume(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
let topic_name: String = parse_request_param(&request, "topic_name")?;
|
||||
let offset: EventOffset = parse_request_param(&request, "offset")?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
let state = get_state(&request);
|
||||
|
||||
let timeline =
|
||||
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
|
||||
.await?;
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
|
||||
let deadline = Instant::now() + Duration::from_secs(5);
|
||||
loop {
|
||||
if Instant::now() > deadline {
|
||||
return json_response(StatusCode::REQUEST_TIMEOUT, "no data yet");
|
||||
}
|
||||
|
||||
let event = timeline
|
||||
.topics
|
||||
.consume(topic_name.clone(), offset, &timeline, &ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
if let Some(event) = event {
|
||||
return json_response(StatusCode::OK, event);
|
||||
};
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn timeline_collect_keyspace(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
@@ -3084,5 +3164,17 @@ pub fn make_router(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/import_wal",
|
||||
|r| api_handler(r, put_tenant_timeline_import_wal),
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/perf_info",
|
||||
|r| testing_api_handler("perf_info", r, perf_info),
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/event_produce/:topic_name",
|
||||
|r| api_handler(r, timeline_event_produce),
|
||||
)
|
||||
.get(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/event_consume/:topic_name/:offset",
|
||||
|r| api_handler(r, timeline_event_consume),
|
||||
)
|
||||
.any(handler_404))
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ use handle::ShardTimelineId;
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::{
|
||||
key::{
|
||||
CompactKey, KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX,
|
||||
CompactKey, EVENT_KEY_PREFIX, KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX,
|
||||
NON_INHERITED_RANGE, NON_INHERITED_SPARSE_RANGE,
|
||||
},
|
||||
keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning},
|
||||
@@ -35,6 +35,7 @@ use pageserver_api::{
|
||||
shard::{ShardIdentity, ShardNumber, TenantShardId},
|
||||
};
|
||||
use rand::Rng;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::serde_as;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tokio::{
|
||||
@@ -44,11 +45,11 @@ use tokio::{
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::{
|
||||
bin_ser::BeSer,
|
||||
fs_ext, pausable_failpoint,
|
||||
sync::gate::{Gate, GateGuard},
|
||||
};
|
||||
|
||||
use std::pin::pin;
|
||||
use std::sync::atomic::Ordering as AtomicOrdering;
|
||||
use std::sync::{Arc, Mutex, RwLock, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
@@ -62,6 +63,7 @@ use std::{
|
||||
collections::btree_map::Entry,
|
||||
ops::{Deref, Range},
|
||||
};
|
||||
use std::{pin::pin, sync::atomic::AtomicU8};
|
||||
|
||||
use crate::{
|
||||
aux_file::AuxFileSizeEstimator,
|
||||
@@ -162,6 +164,293 @@ pub(crate) enum FlushLoopState {
|
||||
Exited,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub(crate) struct Event {
|
||||
offset: EventOffset,
|
||||
payload: EventPayload,
|
||||
}
|
||||
|
||||
/// Response to producers
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub(crate) struct EventAck {
|
||||
pub(crate) offset: EventOffset,
|
||||
}
|
||||
|
||||
struct Topic {
|
||||
name: String,
|
||||
id: TopicId,
|
||||
next_offset: EventOffset,
|
||||
}
|
||||
|
||||
/// Info about topic which is persisted
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct TopicHeader {
|
||||
name: String,
|
||||
next_offset: EventOffset,
|
||||
}
|
||||
|
||||
type TopicId = u32;
|
||||
type TopicName = String;
|
||||
// Offsets should really be u64, using u32 for convenience to avoid packing
|
||||
// u64 into Key fields.
|
||||
pub(crate) type EventOffset = u32;
|
||||
type EventPayload = Bytes;
|
||||
|
||||
struct TopicsInner {
|
||||
next_id: TopicId,
|
||||
topics: HashMap<TopicName, Topic>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct TopicSuperblock {
|
||||
next_id: TopicId,
|
||||
topics: Vec<TopicId>,
|
||||
}
|
||||
|
||||
pub(crate) struct Topics {
|
||||
loaded: AtomicU8,
|
||||
inner: Mutex<TopicsInner>,
|
||||
}
|
||||
|
||||
fn topic_header_key(topic_id: TopicId) -> Key {
|
||||
Key {
|
||||
field1: EVENT_KEY_PREFIX,
|
||||
field2: 0,
|
||||
field3: topic_id,
|
||||
field4: 0,
|
||||
field5: 0,
|
||||
field6: 0,
|
||||
}
|
||||
}
|
||||
|
||||
impl Topics {
|
||||
fn superblock_key(&self) -> Key {
|
||||
Key {
|
||||
field1: EVENT_KEY_PREFIX,
|
||||
field2: 0,
|
||||
field3: 0,
|
||||
field4: 0,
|
||||
field5: 0,
|
||||
field6: 0,
|
||||
}
|
||||
}
|
||||
|
||||
async fn load(&self, timeline: &Timeline, ctx: &RequestContext) -> anyhow::Result<TopicsInner> {
|
||||
let superblock_bytes = match timeline
|
||||
.get(self.superblock_key(), timeline.get_last_record_lsn(), ctx)
|
||||
.await
|
||||
{
|
||||
Ok(bytes) => bytes,
|
||||
Err(PageReconstructError::MissingKey(_)) => {
|
||||
return Ok(TopicsInner {
|
||||
next_id: 1,
|
||||
topics: Default::default(),
|
||||
})
|
||||
}
|
||||
Err(e) => return Err(anyhow::anyhow!(e)),
|
||||
};
|
||||
|
||||
let superblock: TopicSuperblock = serde_json::from_slice(&superblock_bytes).unwrap();
|
||||
|
||||
let mut inner = TopicsInner {
|
||||
topics: Default::default(),
|
||||
next_id: superblock.next_id,
|
||||
};
|
||||
|
||||
for topic_id in superblock.topics {
|
||||
let header_bytes = timeline
|
||||
.get(
|
||||
topic_header_key(topic_id),
|
||||
timeline.get_last_record_lsn(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
let header: TopicHeader = serde_json::from_slice(&header_bytes).unwrap();
|
||||
|
||||
inner.topics.insert(
|
||||
header.name.clone(),
|
||||
Topic {
|
||||
name: header.name,
|
||||
id: topic_id,
|
||||
next_offset: header.next_offset,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
Ok(inner)
|
||||
}
|
||||
|
||||
pub(crate) async fn produce(
|
||||
&self,
|
||||
topic_name: String,
|
||||
timeline: &Timeline,
|
||||
payload: EventPayload,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<EventOffset> {
|
||||
if self.loaded.load(AtomicOrdering::Relaxed) == 0 {
|
||||
let new_inner = self.load(timeline, ctx).await?;
|
||||
*self.inner.lock().unwrap() = new_inner;
|
||||
self.loaded.store(1, AtomicOrdering::Relaxed)
|
||||
}
|
||||
let (lsn, offset, write_keys) = {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
|
||||
// Taking LSN inside self.inner lock to serialize all updates
|
||||
// Advance 8 bytes between imaginary LSNs, to preserve alignment
|
||||
let lsn = Lsn(timeline.last_record_lsn.load().last.0 + 8);
|
||||
|
||||
let mut next_id = Some(inner.next_id);
|
||||
|
||||
use std::collections::hash_map;
|
||||
let (offset, mut write_keys) = match inner.topics.entry(topic_name.clone()) {
|
||||
hash_map::Entry::Vacant(e) => {
|
||||
let topic = Topic {
|
||||
next_offset: 1,
|
||||
name: topic_name.clone(),
|
||||
id: next_id.take().unwrap(),
|
||||
};
|
||||
|
||||
e.insert(topic).produce(payload, lsn)?
|
||||
}
|
||||
hash_map::Entry::Occupied(mut e) => e.get_mut().produce(payload, lsn)?,
|
||||
};
|
||||
|
||||
if next_id.is_none() {
|
||||
inner.next_id += 1;
|
||||
|
||||
// This write transaction includes a superblock update. This must land atomically wrt
|
||||
// the writes to our newly created topic.
|
||||
let superblock = TopicSuperblock {
|
||||
next_id: inner.next_id,
|
||||
topics: inner.topics.values().map(|t| t.id).collect(),
|
||||
};
|
||||
let superblock_bytes = serde_json::to_vec(&superblock).unwrap();
|
||||
let value = Value::Image(superblock_bytes.into());
|
||||
write_keys.push((
|
||||
self.superblock_key().to_compact(),
|
||||
lsn,
|
||||
value.serialized_size().unwrap() as usize,
|
||||
value,
|
||||
));
|
||||
}
|
||||
|
||||
(lsn, offset, write_keys)
|
||||
};
|
||||
|
||||
// FIXME: racing between picking lsn above and actually writing, all
|
||||
// writes should go into a channel.
|
||||
|
||||
// Compose a write transaction: write the event to a page,
|
||||
// and write the topic header to update knowledge of the
|
||||
// next offset
|
||||
let mut writer = timeline.writer().await;
|
||||
writer.put_batch(write_keys, ctx).await?;
|
||||
writer.finish_write(lsn);
|
||||
|
||||
Ok(offset)
|
||||
}
|
||||
|
||||
pub(crate) async fn consume(
|
||||
&self,
|
||||
topic_name: String,
|
||||
offset: EventOffset,
|
||||
timeline: &Timeline,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Option<Event>> {
|
||||
let topic_id = {
|
||||
let inner = self.inner.lock().unwrap();
|
||||
let Some(topic) = inner.topics.get(&topic_name) else {
|
||||
return Err(anyhow::anyhow!("Topic not found"));
|
||||
};
|
||||
|
||||
if topic.next_offset <= offset {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
topic.id
|
||||
};
|
||||
|
||||
let event_key = Key {
|
||||
field1: EVENT_KEY_PREFIX,
|
||||
field2: 0,
|
||||
field3: topic_id,
|
||||
field4: 0,
|
||||
field5: 0,
|
||||
field6: offset,
|
||||
};
|
||||
|
||||
let lsn = timeline.last_record_lsn.load().last;
|
||||
|
||||
match timeline.get(event_key, lsn, ctx).await {
|
||||
Ok(event_bytes) => {
|
||||
let event = serde_json::from_slice(&event_bytes).unwrap();
|
||||
tracing::info!("Event found at {}/{}", topic_name, offset);
|
||||
Ok(Some(event))
|
||||
}
|
||||
Err(PageReconstructError::MissingKey(_)) => {
|
||||
// Unexpected: we checked against next_offset above
|
||||
tracing::warn!("No event found at {}/{}", topic_name, offset);
|
||||
Ok(None)
|
||||
}
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type KeyBatch = Vec<(CompactKey, Lsn, usize, Value)>;
|
||||
|
||||
impl Topic {
|
||||
pub(crate) fn produce(
|
||||
&mut self,
|
||||
payload: EventPayload,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<(EventOffset, KeyBatch)> {
|
||||
let offset = self.next_offset;
|
||||
self.next_offset += 1;
|
||||
|
||||
let event = Event { offset, payload };
|
||||
|
||||
let header_key = topic_header_key(self.id);
|
||||
|
||||
let event_key = Key {
|
||||
field1: EVENT_KEY_PREFIX,
|
||||
field2: 0,
|
||||
field3: self.id,
|
||||
field4: 0,
|
||||
field5: 0,
|
||||
field6: offset,
|
||||
};
|
||||
|
||||
let header = TopicHeader {
|
||||
name: self.name.clone(),
|
||||
next_offset: self.next_offset,
|
||||
};
|
||||
|
||||
let header_bytes = serde_json::to_vec(&header).unwrap().into();
|
||||
let event_bytes = serde_json::to_vec(&event).unwrap().into();
|
||||
|
||||
let header_value = Value::Image(header_bytes);
|
||||
let event_value = Value::Image(event_bytes);
|
||||
|
||||
let write_keys = vec![
|
||||
(
|
||||
header_key.to_compact(),
|
||||
lsn,
|
||||
header_value.serialized_size().unwrap() as usize,
|
||||
header_value,
|
||||
),
|
||||
(
|
||||
event_key.to_compact(),
|
||||
lsn,
|
||||
event_value.serialized_size().unwrap() as usize,
|
||||
event_value,
|
||||
),
|
||||
];
|
||||
|
||||
Ok((offset, write_keys))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||
pub enum ImageLayerCreationMode {
|
||||
/// Try to create image layers based on `time_for_new_image_layer`. Used in compaction code path.
|
||||
@@ -430,6 +719,8 @@ pub struct Timeline {
|
||||
pub(crate) l0_flush_global_state: L0FlushGlobalState,
|
||||
|
||||
pub(crate) handles: handle::PerTimelineState<crate::page_service::TenantManagerTypes>,
|
||||
|
||||
pub(crate) topics: Topics,
|
||||
}
|
||||
|
||||
pub struct WalReceiverInfo {
|
||||
@@ -2240,6 +2531,15 @@ impl Timeline {
|
||||
l0_flush_global_state: resources.l0_flush_global_state,
|
||||
|
||||
handles: Default::default(),
|
||||
|
||||
topics: Topics {
|
||||
inner: Mutex::new(TopicsInner {
|
||||
// TODO: recover from disk on startup
|
||||
next_id: 123,
|
||||
topics: Default::default(),
|
||||
}),
|
||||
loaded: Default::default(),
|
||||
},
|
||||
};
|
||||
|
||||
if aux_file_policy == Some(AuxFilePolicy::V1) {
|
||||
|
||||
Reference in New Issue
Block a user