feat: buffered parquet writer (#1263)

* wip: use

* rebase develop

* chore: fix typos

* feat: replace export parquet writer with buffered writer

* fix: some cr comments

* feat: add sst_write_buffer_size config item to config how many bytes to buffer before flush to underlying storage

* chore: reabse onto develop
This commit is contained in:
Lei, HUANG
2023-04-01 17:21:19 +08:00
committed by GitHub
parent 6a05f617a4
commit 0253136333
20 changed files with 346 additions and 172 deletions

View File

@@ -178,6 +178,7 @@ mod tests {
use std::io::Write;
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_test_util::temp_dir::create_named_temp_file;
use datanode::datanode::{CompactionConfig, ObjectStoreConfig, RegionManifestConfig};
use servers::Mode;
@@ -266,6 +267,7 @@ mod tests {
max_inflight_tasks: 3,
max_files_in_level0: 7,
max_purge_tasks: 32,
sst_write_buffer_size: ReadableSize::mb(8),
},
options.storage.compaction,
);

View File

@@ -53,6 +53,10 @@ impl ReadableSize {
pub const fn as_mb(self) -> u64 {
self.0 / MIB
}
pub const fn as_bytes(self) -> u64 {
self.0
}
}
impl Div<u64> for ReadableSize {

View File

@@ -150,6 +150,8 @@ pub struct CompactionConfig {
pub max_files_in_level0: usize,
/// Max task number for SST purge task after compaction.
pub max_purge_tasks: usize,
/// Buffer threshold while writing SST files
pub sst_write_buffer_size: ReadableSize,
}
impl Default for CompactionConfig {
@@ -158,6 +160,7 @@ impl Default for CompactionConfig {
max_inflight_tasks: 4,
max_files_in_level0: 8,
max_purge_tasks: 32,
sst_write_buffer_size: ReadableSize::mb(8),
}
}
}
@@ -177,6 +180,7 @@ impl From<&DatanodeOptions> for StorageEngineConfig {
manifest_gc_duration: value.storage.manifest.gc_duration,
max_files_in_l0: value.storage.compaction.max_files_in_level0,
max_purge_tasks: value.storage.compaction.max_purge_tasks,
sst_write_buffer_size: value.storage.compaction.sst_write_buffer_size,
}
}
}

View File

@@ -439,12 +439,6 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("Failed to write parquet file, source: {}", source))]
WriteParquet {
source: parquet::errors::ParquetError,
backtrace: Backtrace,
},
#[snafu(display("Failed to poll stream, source: {}", source))]
PollStream {
source: datafusion_common::DataFusionError,
@@ -514,6 +508,12 @@ pub enum Error {
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display("Failed to copy table to parquet file, source: {}", source))]
WriteParquet {
#[snafu(backtrace)]
source: storage::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -12,24 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::pin::Pin;
use common_datasource;
use common_datasource::object_store::{build_backend, parse_url};
use common_query::physical_plan::SessionContext;
use common_query::Output;
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
use datafusion::parquet::arrow::ArrowWriter;
use datafusion::parquet::basic::{Compression, Encoding, ZstdLevel};
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::physical_plan::RecordBatchStream;
use futures::TryStreamExt;
use object_store::ObjectStore;
use snafu::ResultExt;
use storage::sst::SstInfo;
use storage::{ParquetWriter, Source};
use table::engine::TableReference;
use table::requests::CopyTableRequest;
use crate::error::{self, Result};
use crate::error::{self, Result, WriteParquetSnafu};
use crate::sql::SqlHandler;
impl SqlHandler {
@@ -51,99 +44,20 @@ impl SqlHandler {
let stream = stream
.execute(0, SessionContext::default().task_ctx())
.context(error::TableScanExecSnafu)?;
let stream = Box::pin(DfRecordBatchStreamAdapter::new(stream));
let (_schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?;
let object_store =
build_backend(&req.location, req.connection).context(error::BuildBackendSnafu)?;
let mut parquet_writer = ParquetWriter::new(path.to_string(), stream, object_store);
// TODO(jiachun):
// For now, COPY is implemented synchronously.
// When copying large table, it will be blocked for a long time.
// Maybe we should make "copy" runs in background?
// Like PG: https://www.postgresql.org/docs/current/sql-copy.html
let rows = parquet_writer.flush().await?;
let writer = ParquetWriter::new(&path, Source::Stream(stream), object_store);
Ok(Output::AffectedRows(rows))
}
}
type DfRecordBatchStream = Pin<Box<DfRecordBatchStreamAdapter>>;
struct ParquetWriter {
file_name: String,
stream: DfRecordBatchStream,
object_store: ObjectStore,
max_row_group_size: usize,
max_rows_in_segment: usize,
}
impl ParquetWriter {
pub fn new(file_name: String, stream: DfRecordBatchStream, object_store: ObjectStore) -> Self {
Self {
file_name,
stream,
object_store,
// TODO(jiachun): make these configurable: WITH (max_row_group_size=xxx, max_rows_in_segment=xxx)
max_row_group_size: 4096,
max_rows_in_segment: 5000000, // default 5M rows per segment
}
}
pub async fn flush(&mut self) -> Result<usize> {
let schema = self.stream.as_ref().schema();
let writer_props = WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.set_encoding(Encoding::PLAIN)
.set_max_row_group_size(self.max_row_group_size)
.build();
let mut total_rows = 0;
loop {
let mut buf = vec![];
let mut arrow_writer =
ArrowWriter::try_new(&mut buf, schema.clone(), Some(writer_props.clone()))
.context(error::WriteParquetSnafu)?;
let mut rows = 0;
let mut end_loop = true;
// TODO(hl & jiachun): Since OpenDAL's writer is async and ArrowWriter requires a `std::io::Write`,
// here we use a Vec<u8> to buffer all parquet bytes in memory and write to object store
// at a time. Maybe we should find a better way to bridge ArrowWriter and OpenDAL's object.
while let Some(batch) = self
.stream
.try_next()
.await
.context(error::PollStreamSnafu)?
{
arrow_writer
.write(&batch)
.context(error::WriteParquetSnafu)?;
rows += batch.num_rows();
if rows >= self.max_rows_in_segment {
end_loop = false;
break;
}
}
let start_row_num = total_rows + 1;
total_rows += rows;
arrow_writer.close().context(error::WriteParquetSnafu)?;
// if rows == 0, we just end up with an empty file.
//
// file_name like:
// "file_name_1_1000000" (row num: 1 ~ 1000000),
// "file_name_1000001_xxx" (row num: 1000001 ~ xxx)
let file_name = format!("{}_{}_{}", self.file_name, start_row_num, total_rows);
self.object_store
.write(&file_name, buf)
.await
.context(error::WriteObjectSnafu { path: file_name })?;
if end_loop {
return Ok(total_rows);
}
}
let rows_copied = writer
.write_sst(&storage::sst::WriteOptions::default())
.await
.context(WriteParquetSnafu)?
.map(|SstInfo { num_rows, .. }| num_rows)
.unwrap_or(0);
Ok(Output::AffectedRows(rows_copied))
}
}

View File

@@ -16,7 +16,7 @@ pub use opendal::raw::normalize_path as raw_normalize_path;
pub use opendal::raw::oio::Pager;
pub use opendal::{
layers, services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, Metakey,
Operator as ObjectStore, Result,
Operator as ObjectStore, Result, Writer,
};
pub mod cache_policy;

View File

@@ -132,6 +132,7 @@ impl<S: LogStore> Picker for SimplePicker<S> {
wal: req.wal.clone(),
manifest: req.manifest.clone(),
expired_ssts,
sst_write_buffer_size: req.sst_write_buffer_size,
}));
}

View File

@@ -15,6 +15,7 @@
use std::sync::Arc;
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, error, info};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
@@ -60,6 +61,8 @@ pub struct CompactionRequestImpl<S: LogStore> {
pub ttl: Option<Duration>,
/// Compaction result sender.
pub sender: Option<Sender<Result<()>>>,
pub sst_write_buffer_size: ReadableSize,
}
impl<S: LogStore> CompactionRequestImpl<S> {

View File

@@ -15,6 +15,7 @@
use std::collections::HashSet;
use std::fmt::{Debug, Formatter};
use common_base::readable_size::ReadableSize;
use common_telemetry::{error, info};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
@@ -46,6 +47,7 @@ pub struct CompactionTaskImpl<S: LogStore> {
pub wal: Wal<S>,
pub manifest: RegionManifest,
pub expired_ssts: Vec<FileHandle>,
pub sst_write_buffer_size: ReadableSize,
}
impl<S: LogStore> Debug for CompactionTaskImpl<S> {
@@ -71,14 +73,14 @@ impl<S: LogStore> CompactionTaskImpl<S> {
for output in self.outputs.drain(..) {
let schema = self.schema.clone();
let sst_layer = self.sst_layer.clone();
let sst_write_buffer_size = self.sst_write_buffer_size;
compacted_inputs.extend(output.inputs.iter().map(FileHandle::meta));
// TODO(hl): Maybe spawn to runtime to exploit in-job parallelism.
futs.push(async move {
match output.build(region_id, schema, sst_layer).await {
Ok(meta) => Ok(meta),
Err(e) => Err(e),
}
output
.build(region_id, schema, sst_layer, sst_write_buffer_size)
.await
});
}
@@ -172,6 +174,7 @@ impl CompactionOutput {
region_id: RegionId,
schema: RegionSchemaRef,
sst_layer: AccessLayerRef,
sst_write_buffer_size: ReadableSize,
) -> Result<Option<FileMeta>> {
let reader = build_sst_reader(
schema,
@@ -183,7 +186,9 @@ impl CompactionOutput {
.await?;
let output_file_id = FileId::random();
let opts = WriteOptions {};
let opts = WriteOptions {
sst_write_buffer_size,
};
Ok(sst_layer
.write_sst(output_file_id, Source::Reader(reader), &opts)
@@ -192,6 +197,7 @@ impl CompactionOutput {
|SstInfo {
time_range,
file_size,
..
}| FileMeta {
region_id,
file_id: output_file_id,

View File

@@ -85,6 +85,7 @@ mod tests {
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use common_test_util::temp_dir::create_temp_dir;
use common_time::Timestamp;
use datatypes::prelude::{LogicalTypeId, ScalarVector, ScalarVectorBuilder};
@@ -224,6 +225,7 @@ mod tests {
let SstInfo {
time_range,
file_size,
..
} = writer
.write_sst(&sst::WriteOptions::default())
.await
@@ -411,7 +413,9 @@ mod tests {
.await
.unwrap();
let opts = WriteOptions {};
let opts = WriteOptions {
sst_write_buffer_size: ReadableSize::mb(8),
};
let s1 = ParquetWriter::new(
&output_file_ids[0].as_parquet(),
Source::Reader(reader1),

View File

@@ -16,12 +16,15 @@
use std::time::Duration;
use common_base::readable_size::ReadableSize;
#[derive(Debug, Clone)]
pub struct EngineConfig {
pub manifest_checkpoint_margin: Option<u16>,
pub manifest_gc_duration: Option<Duration>,
pub max_files_in_l0: usize,
pub max_purge_tasks: usize,
pub sst_write_buffer_size: ReadableSize,
}
impl Default for EngineConfig {
@@ -31,6 +34,7 @@ impl Default for EngineConfig {
manifest_gc_duration: Some(Duration::from_secs(30)),
max_files_in_l0: 8,
max_purge_tasks: 32,
sst_write_buffer_size: ReadableSize::mb(8),
}
}
}

View File

@@ -147,7 +147,7 @@ mod tests {
let sst_path = "table1";
let layer = Arc::new(FsAccessLayer::new(sst_path, os.clone()));
let sst_info = layer
.write_sst(sst_file_id, Source::Iter(iter), &WriteOptions {})
.write_sst(sst_file_id, Source::Iter(iter), &WriteOptions::default())
.await
.unwrap()
.unwrap();

View File

@@ -23,6 +23,7 @@ use store_api::storage::consts::WRITE_ROW_GROUP_SIZE;
use store_api::storage::SequenceNumber;
use crate::background::{Context, Job, JobHandle, JobPoolRef};
use crate::config::EngineConfig;
use crate::error::{CancelledSnafu, Result};
use crate::manifest::action::*;
use crate::manifest::region::RegionManifest;
@@ -174,6 +175,8 @@ pub struct FlushJob<S: LogStore> {
pub manifest: RegionManifest,
/// Callbacks that get invoked on flush success.
pub on_success: Option<FlushCallback>,
/// Storage engine config
pub engine_config: Arc<EngineConfig>,
}
impl<S: LogStore> FlushJob<S> {
@@ -190,6 +193,7 @@ impl<S: LogStore> FlushJob<S> {
batch_size: WRITE_ROW_GROUP_SIZE,
..Default::default()
};
for m in &self.memtables {
// skip empty memtable
if m.num_rows() == 0 {
@@ -200,15 +204,18 @@ impl<S: LogStore> FlushJob<S> {
// TODO(hl): Check if random file name already exists in meta.
let iter = m.iter(&iter_ctx)?;
let sst_layer = self.sst_layer.clone();
let write_options = WriteOptions {
sst_write_buffer_size: self.engine_config.sst_write_buffer_size,
};
futures.push(async move {
Ok(sst_layer
.write_sst(file_id, Source::Iter(iter), &WriteOptions::default())
.write_sst(file_id, Source::Iter(iter), &write_options)
.await?
.map(
|SstInfo {
time_range,
file_size,
..
}| FileMeta {
region_id,
file_id,

View File

@@ -31,7 +31,7 @@ pub mod region;
pub mod scheduler;
pub mod schema;
mod snapshot;
mod sst;
pub mod sst;
mod sync;
#[cfg(test)]
mod test_util;
@@ -41,3 +41,6 @@ pub mod write_batch;
pub use engine::EngineImpl;
mod file_purger;
pub use sst::parquet::ParquetWriter;
pub use sst::Source;

View File

@@ -15,6 +15,7 @@
use std::sync::Arc;
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_error::prelude::BoxedError;
use common_telemetry::tracing::log::info;
use common_telemetry::{error, logging};
@@ -297,8 +298,10 @@ impl RegionWriter {
let mut inner = self.inner.lock().await;
ensure!(!inner.is_closed(), error::ClosedRegionSnafu);
inner.manual_compact(writer_ctx, ctx).await
let sst_write_buffer_size = inner.engine_config.sst_write_buffer_size;
inner
.manual_compact(writer_ctx, ctx, sst_write_buffer_size)
.await
}
/// Cancel flush task if any
@@ -648,6 +651,7 @@ impl WriterInner {
wal: ctx.wal.clone(),
manifest: ctx.manifest.clone(),
on_success: cb,
engine_config: self.engine_config.clone(),
};
let flush_handle = ctx
@@ -663,6 +667,7 @@ impl WriterInner {
&mut self,
writer_ctx: WriterContext<'_, S>,
compact_ctx: CompactContext,
sst_write_buffer_size: ReadableSize,
) -> Result<()> {
let region_id = writer_ctx.shared.id();
let mut compaction_request = CompactionRequestImpl {
@@ -674,6 +679,7 @@ impl WriterInner {
wal: writer_ctx.wal.clone(),
ttl: self.ttl,
sender: None,
sst_write_buffer_size,
};
let compaction_scheduler = writer_ctx.compaction_scheduler.clone();
@@ -730,6 +736,7 @@ impl WriterInner {
wal: ctx.wal.clone(),
ttl,
sender: None,
sst_write_buffer_size: config.sst_write_buffer_size,
};
let compaction_scheduler = ctx.compaction_scheduler.clone();
let shared_data = ctx.shared.clone();

View File

@@ -13,6 +13,7 @@
// limitations under the License.
pub(crate) mod parquet;
mod stream_writer;
use std::collections::HashMap;
use std::fmt;
@@ -21,9 +22,13 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
use common_base::readable_size::ReadableSize;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{error, info};
use common_time::range::TimestampRange;
use common_time::Timestamp;
use datatypes::schema::SchemaRef;
use futures_util::StreamExt;
use object_store::{util, ObjectStore};
use serde::{Deserialize, Deserializer, Serialize};
use snafu::{ResultExt, Snafu};
@@ -32,6 +37,7 @@ use table::predicate::Predicate;
use uuid::Uuid;
use crate::chunk::ChunkReaderImpl;
use crate::error;
use crate::error::{DeleteSstSnafu, Result};
use crate::file_purger::{FilePurgeRequest, FilePurgerRef};
use crate::memtable::BoxedBatchIterator;
@@ -45,6 +51,8 @@ pub const MAX_LEVEL: u8 = 2;
pub type Level = u8;
pub use crate::sst::stream_writer::BufferedWriter;
// We only has fixed number of level, so we use array to hold elements. This implementation
// detail of LevelMetaVec should not be exposed to the user of [LevelMetas].
type LevelMetaVec = [LevelMeta; MAX_LEVEL as usize];
@@ -383,9 +391,18 @@ where
FileId::from_str(stripped).map_err(<D::Error as serde::de::Error>::custom)
}
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct WriteOptions {
// TODO(yingwen): [flush] row group size.
pub sst_write_buffer_size: ReadableSize,
}
impl Default for WriteOptions {
fn default() -> Self {
Self {
sst_write_buffer_size: ReadableSize::mb(8),
}
}
}
pub struct ReadOptions {
@@ -403,6 +420,7 @@ pub struct ReadOptions {
pub struct SstInfo {
pub time_range: Option<(Timestamp, Timestamp)>,
pub file_size: u64,
pub num_rows: usize,
}
/// SST access layer.
@@ -439,6 +457,8 @@ pub enum Source {
Iter(BoxedBatchIterator),
/// Writes row from ChunkReaderImpl (maybe a set of SSTs) to parquet.
Reader(ChunkReaderImpl),
/// Record batch stream yielded by table scan
Stream(SendableRecordBatchStream),
}
impl Source {
@@ -449,13 +469,23 @@ impl Source {
.next_chunk()
.await
.map(|p| p.map(|chunk| Batch::new(chunk.columns))),
Source::Stream(stream) => stream
.next()
.await
.transpose()
.map(|r| r.map(|r| Batch::new(r.columns().to_vec())))
.context(error::CreateRecordBatchSnafu),
}
}
fn projected_schema(&self) -> ProjectedSchemaRef {
fn schema(&self) -> SchemaRef {
match self {
Source::Iter(iter) => iter.schema(),
Source::Reader(reader) => reader.projected_schema().clone(),
Source::Iter(iter) => {
let projected_schema = iter.schema();
projected_schema.schema_to_read().schema().clone()
}
Source::Reader(reader) => reader.projected_schema().schema_to_read().schema().clone(),
Source::Stream(stream) => stream.schema(),
}
}
}

View File

@@ -27,7 +27,7 @@ use arrow_array::{
use async_compat::CompatExt;
use async_stream::try_stream;
use async_trait::async_trait;
use common_telemetry::error;
use common_telemetry::{error, warn};
use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
@@ -38,7 +38,7 @@ use datatypes::prelude::ConcreteDataType;
use futures_util::{Stream, StreamExt, TryStreamExt};
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
use parquet::file::properties::WriterProperties;
@@ -48,15 +48,14 @@ use snafu::{OptionExt, ResultExt};
use table::predicate::Predicate;
use tokio::io::BufReader;
use crate::error::{
self, DecodeParquetTimeRangeSnafu, NewRecordBatchSnafu, ReadObjectSnafu, ReadParquetSnafu,
Result, WriteObjectSnafu, WriteParquetSnafu,
};
use crate::error::{self, DecodeParquetTimeRangeSnafu, ReadObjectSnafu, ReadParquetSnafu, Result};
use crate::read::{Batch, BatchReader};
use crate::schema::compat::ReadAdapter;
use crate::schema::{ProjectedSchemaRef, StoreSchema, StoreSchemaRef};
use crate::schema::{ProjectedSchemaRef, StoreSchema};
use crate::sst;
use crate::sst::stream_writer::BufferedWriter;
use crate::sst::{FileHandle, Source, SstInfo};
/// Parquet sst writer.
pub struct ParquetWriter<'a> {
file_path: &'a str,
@@ -75,8 +74,8 @@ impl<'a> ParquetWriter<'a> {
}
}
pub async fn write_sst(self, _opts: &sst::WriteOptions) -> Result<Option<SstInfo>> {
self.write_rows(None).await
pub async fn write_sst(self, opts: &sst::WriteOptions) -> Result<Option<SstInfo>> {
self.write_rows(None, opts).await
}
/// Iterates memtable and writes rows to Parquet file.
@@ -85,11 +84,9 @@ impl<'a> ParquetWriter<'a> {
async fn write_rows(
mut self,
extra_meta: Option<HashMap<String, String>>,
opts: &sst::WriteOptions,
) -> Result<Option<SstInfo>> {
let projected_schema = self.source.projected_schema();
let store_schema = projected_schema.schema_to_read();
let schema = store_schema.arrow_schema().clone();
let schema = self.source.schema();
let writer_props = WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.set_encoding(Encoding::PLAIN)
@@ -101,62 +98,48 @@ impl<'a> ParquetWriter<'a> {
}))
.build();
// TODO(hl): Since OpenDAL's writer is async and ArrowWriter requires a `std::io::Write`,
// here we use a Vec<u8> to buffer all parquet bytes in memory and write to object store
// at a time. Maybe we should find a better way to bridge ArrowWriter and OpenDAL's object.
let mut buf = vec![];
let mut arrow_writer = ArrowWriter::try_new(&mut buf, schema.clone(), Some(writer_props))
.context(WriteParquetSnafu)?;
let mut buffered_writer = BufferedWriter::try_new(
self.file_path.to_string(),
self.object_store.clone(),
&schema,
Some(writer_props),
opts.sst_write_buffer_size.as_bytes() as usize,
)
.await?;
let mut rows_written = 0;
let mut batches_written = 0;
while let Some(batch) = self.source.next_batch().await? {
let arrow_batch = RecordBatch::try_new(
schema.clone(),
batch
.columns()
.iter()
.map(|v| v.to_arrow_array())
.collect::<Vec<_>>(),
)
.context(NewRecordBatchSnafu)?;
arrow_writer
.write(&arrow_batch)
.context(WriteParquetSnafu)?;
batches_written += 1;
buffered_writer.write(&batch).await?;
rows_written += batch.num_rows();
}
if batches_written == 0 {
if rows_written == 0 {
// if the source does not contain any batch, we skip writing an empty parquet file.
if !buffered_writer.abort().await {
warn!(
"Partial file {} has been uploaded to remote storage",
self.file_path
);
}
return Ok(None);
}
let file_meta = arrow_writer.close().context(WriteParquetSnafu)?;
let time_range = decode_timestamp_range(&file_meta, store_schema)
.ok()
.flatten();
let (file_meta, file_size) = buffered_writer.close().await?;
let time_range = decode_timestamp_range(&file_meta, &schema).ok().flatten();
// object_store.write will make sure all bytes are written or an error is raised.
let buf_len = buf.len() as u64;
self.object_store
.write(self.file_path, buf)
.await
.context(WriteObjectSnafu {
path: self.file_path,
})?;
let file_size = buf_len;
Ok(Some(SstInfo {
time_range,
file_size,
num_rows: rows_written,
}))
}
}
fn decode_timestamp_range(
file_meta: &FileMetaData,
store_schema: &StoreSchemaRef,
schema: &datatypes::schema::SchemaRef,
) -> Result<Option<(Timestamp, Timestamp)>> {
let schema = store_schema.schema();
let (Some(ts_col_idx), Some(ts_col)) = (schema.timestamp_index(), schema.timestamp_column()) else { return Ok(None); };
let ts_datatype = &ts_col.data_type;
decode_timestamp_range_inner(file_meta, ts_col_idx, ts_datatype)
@@ -574,6 +557,7 @@ mod tests {
#[tokio::test]
async fn test_parquet_writer() {
common_telemetry::init_default_ut_logging();
let schema = memtable_tests::schema_for_test();
let memtable = DefaultMemtableBuilder::default().build(schema);
@@ -701,6 +685,7 @@ mod tests {
let SstInfo {
time_range,
file_size,
..
} = writer
.write_sst(&sst::WriteOptions::default())
.await
@@ -793,6 +778,7 @@ mod tests {
let SstInfo {
time_range,
file_size,
..
} = writer
.write_sst(&sst::WriteOptions::default())
.await
@@ -906,6 +892,7 @@ mod tests {
let SstInfo {
time_range,
file_size,
..
} = writer
.write_sst(&sst::WriteOptions::default())
.await

View File

@@ -0,0 +1,198 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io::Write;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use arrow_array::RecordBatch;
use bytes::{BufMut, BytesMut};
use datatypes::schema::SchemaRef;
use object_store::{ObjectStore, Writer};
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use parquet::format::FileMetaData;
use snafu::ResultExt;
use crate::error;
use crate::error::{NewRecordBatchSnafu, WriteObjectSnafu, WriteParquetSnafu};
use crate::read::Batch;
#[derive(Clone, Default)]
struct Buffer {
// It's lightweight since writer/flusher never tries to contend this mutex.
buffer: Arc<Mutex<BytesMut>>,
}
impl Buffer {
pub fn with_capacity(size: usize) -> Self {
Self {
buffer: Arc::new(Mutex::new(BytesMut::with_capacity(size))),
}
}
}
impl Write for Buffer {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let len = buf.len();
let mut buffer = self.buffer.lock().unwrap();
buffer.put_slice(buf);
Ok(len)
}
fn flush(&mut self) -> std::io::Result<()> {
// This flush implementation is intentionally left to blank.
// The actual flush is in `BufferedWriter::try_flush`
Ok(())
}
}
/// Parquet writer that buffers row groups in memory and writes buffered data to an underlying
/// storage by chunks to reduce memory consumption.
pub struct BufferedWriter {
path: String,
arrow_writer: ArrowWriter<Buffer>,
object_writer: Writer,
buffer: Buffer,
bytes_written: AtomicU64,
flushed: bool,
threshold: usize,
arrow_schema: arrow::datatypes::SchemaRef,
}
impl BufferedWriter {
pub async fn try_new(
path: String,
store: ObjectStore,
schema: &SchemaRef,
props: Option<WriterProperties>,
buffer_threshold: usize,
) -> error::Result<Self> {
let arrow_schema = schema.arrow_schema();
let buffer = Buffer::with_capacity(buffer_threshold);
let writer = store
.writer(&path)
.await
.context(WriteObjectSnafu { path: &path })?;
let arrow_writer = ArrowWriter::try_new(buffer.clone(), arrow_schema.clone(), props)
.context(WriteParquetSnafu)?;
Ok(Self {
path,
arrow_writer,
object_writer: writer,
buffer,
bytes_written: Default::default(),
flushed: false,
threshold: buffer_threshold,
arrow_schema: arrow_schema.clone(),
})
}
/// Write a record batch to stream writer.
pub async fn write(&mut self, batch: &Batch) -> error::Result<()> {
let arrow_batch = RecordBatch::try_new(
self.arrow_schema.clone(),
batch
.columns()
.iter()
.map(|v| v.to_arrow_array())
.collect::<Vec<_>>(),
)
.context(NewRecordBatchSnafu)?;
self.arrow_writer
.write(&arrow_batch)
.context(WriteParquetSnafu)?;
let written = Self::try_flush(
&self.path,
&self.buffer,
&mut self.object_writer,
false,
&mut self.flushed,
self.threshold,
)
.await?;
self.bytes_written.fetch_add(written, Ordering::Relaxed);
Ok(())
}
/// Abort writer.
pub async fn abort(self) -> bool {
// TODO(hl): Currently we can do nothing if file's parts have been uploaded to remote storage
// on abortion, we need to find a way to abort the upload. see https://help.aliyun.com/document_detail/31996.htm?spm=a2c4g.11186623.0.0.3eb42cb7b2mwUz#reference-txp-bvx-wdb
!self.flushed
}
/// Close parquet writer and ensure all buffered data are written into underlying storage.
pub async fn close(mut self) -> error::Result<(FileMetaData, u64)> {
let metadata = self.arrow_writer.close().context(WriteParquetSnafu)?;
let written = Self::try_flush(
&self.path,
&self.buffer,
&mut self.object_writer,
true,
&mut self.flushed,
self.threshold,
)
.await?;
self.bytes_written.fetch_add(written, Ordering::Relaxed);
self.object_writer
.close()
.await
.context(WriteObjectSnafu { path: &self.path })?;
Ok((metadata, self.bytes_written.load(Ordering::Relaxed)))
}
/// Try to flush buffered data to underlying storage if it's size exceeds threshold.
/// Set `all` to true if all buffered data should be flushed regardless of it's size.
async fn try_flush(
file_name: &str,
shared_buffer: &Buffer,
object_writer: &mut Writer,
all: bool,
flushed: &mut bool,
threshold: usize,
) -> error::Result<u64> {
let mut bytes_written = 0;
// Once buffered data size reaches threshold, split the data in chunks (typically 4MB)
// and write to underlying storage.
while shared_buffer.buffer.lock().unwrap().len() >= threshold {
let chunk = {
let mut buffer = shared_buffer.buffer.lock().unwrap();
buffer.split_to(threshold)
};
let size = chunk.len();
object_writer
.append(chunk)
.await
.context(WriteObjectSnafu { path: file_name })?;
*flushed = true;
bytes_written += size;
}
if all {
let remain = shared_buffer.buffer.lock().unwrap().split();
let size = remain.len();
object_writer
.append(remain)
.await
.context(WriteObjectSnafu { path: file_name })?;
*flushed = true;
bytes_written += size;
}
Ok(bytes_written as u64)
}
}

View File

@@ -14,7 +14,7 @@ CREATE TABLE with_filename(host string, cpu double, memory double, ts timestamp
Affected Rows: 0
Copy with_filename FROM '/tmp/demo/export/demo.parquet_1_2';
Copy with_filename FROM '/tmp/demo/export/demo.parquet';
Affected Rows: 2

View File

@@ -6,7 +6,7 @@ Copy demo TO '/tmp/demo/export/demo.parquet';
CREATE TABLE with_filename(host string, cpu double, memory double, ts timestamp time index);
Copy with_filename FROM '/tmp/demo/export/demo.parquet_1_2';
Copy with_filename FROM '/tmp/demo/export/demo.parquet';
select * from with_filename order by ts;