From 0253136333dd8deb2fda05b66dde077da6e5d8a0 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Sat, 1 Apr 2023 17:21:19 +0800 Subject: [PATCH] feat: buffered parquet writer (#1263) * wip: use * rebase develop * chore: fix typos * feat: replace export parquet writer with buffered writer * fix: some cr comments * feat: add sst_write_buffer_size config item to config how many bytes to buffer before flush to underlying storage * chore: reabse onto develop --- src/cmd/src/datanode.rs | 2 + src/common/base/src/readable_size.rs | 4 + src/datanode/src/datanode.rs | 4 + src/datanode/src/error.rs | 12 +- src/datanode/src/sql/copy_table_to.rs | 110 ++-------- src/object-store/src/lib.rs | 2 +- src/storage/src/compaction/picker.rs | 1 + src/storage/src/compaction/scheduler.rs | 3 + src/storage/src/compaction/task.rs | 16 +- src/storage/src/compaction/writer.rs | 6 +- src/storage/src/config.rs | 4 + src/storage/src/file_purger.rs | 2 +- src/storage/src/flush.rs | 11 +- src/storage/src/lib.rs | 5 +- src/storage/src/region/writer.rs | 11 +- src/storage/src/sst.rs | 38 +++- src/storage/src/sst/parquet.rs | 85 ++++---- src/storage/src/sst/stream_writer.rs | 198 ++++++++++++++++++ .../cases/standalone/copy/copy_from_fs.result | 2 +- tests/cases/standalone/copy/copy_from_fs.sql | 2 +- 20 files changed, 346 insertions(+), 172 deletions(-) create mode 100644 src/storage/src/sst/stream_writer.rs diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index f7ae79911a..5aba79f81d 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -178,6 +178,7 @@ mod tests { use std::io::Write; use std::time::Duration; + use common_base::readable_size::ReadableSize; use common_test_util::temp_dir::create_named_temp_file; use datanode::datanode::{CompactionConfig, ObjectStoreConfig, RegionManifestConfig}; use servers::Mode; @@ -266,6 +267,7 @@ mod tests { max_inflight_tasks: 3, max_files_in_level0: 7, max_purge_tasks: 32, + sst_write_buffer_size: ReadableSize::mb(8), }, options.storage.compaction, ); diff --git a/src/common/base/src/readable_size.rs b/src/common/base/src/readable_size.rs index ee428539b4..ef25eab5f1 100644 --- a/src/common/base/src/readable_size.rs +++ b/src/common/base/src/readable_size.rs @@ -53,6 +53,10 @@ impl ReadableSize { pub const fn as_mb(self) -> u64 { self.0 / MIB } + + pub const fn as_bytes(self) -> u64 { + self.0 + } } impl Div for ReadableSize { diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 482ec882c5..0883f7d4ba 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -150,6 +150,8 @@ pub struct CompactionConfig { pub max_files_in_level0: usize, /// Max task number for SST purge task after compaction. pub max_purge_tasks: usize, + /// Buffer threshold while writing SST files + pub sst_write_buffer_size: ReadableSize, } impl Default for CompactionConfig { @@ -158,6 +160,7 @@ impl Default for CompactionConfig { max_inflight_tasks: 4, max_files_in_level0: 8, max_purge_tasks: 32, + sst_write_buffer_size: ReadableSize::mb(8), } } } @@ -177,6 +180,7 @@ impl From<&DatanodeOptions> for StorageEngineConfig { manifest_gc_duration: value.storage.manifest.gc_duration, max_files_in_l0: value.storage.compaction.max_files_in_level0, max_purge_tasks: value.storage.compaction.max_purge_tasks, + sst_write_buffer_size: value.storage.compaction.sst_write_buffer_size, } } } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 01f9fbc27f..1346dcf35d 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -439,12 +439,6 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Failed to write parquet file, source: {}", source))] - WriteParquet { - source: parquet::errors::ParquetError, - backtrace: Backtrace, - }, - #[snafu(display("Failed to poll stream, source: {}", source))] PollStream { source: datafusion_common::DataFusionError, @@ -514,6 +508,12 @@ pub enum Error { #[snafu(backtrace)] source: BoxedError, }, + + #[snafu(display("Failed to copy table to parquet file, source: {}", source))] + WriteParquet { + #[snafu(backtrace)] + source: storage::error::Error, + }, } pub type Result = std::result::Result; diff --git a/src/datanode/src/sql/copy_table_to.rs b/src/datanode/src/sql/copy_table_to.rs index 5338f51f4e..98abadbce1 100644 --- a/src/datanode/src/sql/copy_table_to.rs +++ b/src/datanode/src/sql/copy_table_to.rs @@ -12,24 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::pin::Pin; - use common_datasource; use common_datasource::object_store::{build_backend, parse_url}; use common_query::physical_plan::SessionContext; use common_query::Output; -use common_recordbatch::adapter::DfRecordBatchStreamAdapter; -use datafusion::parquet::arrow::ArrowWriter; -use datafusion::parquet::basic::{Compression, Encoding, ZstdLevel}; -use datafusion::parquet::file::properties::WriterProperties; -use datafusion::physical_plan::RecordBatchStream; -use futures::TryStreamExt; -use object_store::ObjectStore; use snafu::ResultExt; +use storage::sst::SstInfo; +use storage::{ParquetWriter, Source}; use table::engine::TableReference; use table::requests::CopyTableRequest; -use crate::error::{self, Result}; +use crate::error::{self, Result, WriteParquetSnafu}; use crate::sql::SqlHandler; impl SqlHandler { @@ -51,99 +44,20 @@ impl SqlHandler { let stream = stream .execute(0, SessionContext::default().task_ctx()) .context(error::TableScanExecSnafu)?; - let stream = Box::pin(DfRecordBatchStreamAdapter::new(stream)); let (_schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?; let object_store = build_backend(&req.location, req.connection).context(error::BuildBackendSnafu)?; - let mut parquet_writer = ParquetWriter::new(path.to_string(), stream, object_store); - // TODO(jiachun): - // For now, COPY is implemented synchronously. - // When copying large table, it will be blocked for a long time. - // Maybe we should make "copy" runs in background? - // Like PG: https://www.postgresql.org/docs/current/sql-copy.html - let rows = parquet_writer.flush().await?; + let writer = ParquetWriter::new(&path, Source::Stream(stream), object_store); - Ok(Output::AffectedRows(rows)) - } -} - -type DfRecordBatchStream = Pin>; - -struct ParquetWriter { - file_name: String, - stream: DfRecordBatchStream, - object_store: ObjectStore, - max_row_group_size: usize, - max_rows_in_segment: usize, -} - -impl ParquetWriter { - pub fn new(file_name: String, stream: DfRecordBatchStream, object_store: ObjectStore) -> Self { - Self { - file_name, - stream, - object_store, - // TODO(jiachun): make these configurable: WITH (max_row_group_size=xxx, max_rows_in_segment=xxx) - max_row_group_size: 4096, - max_rows_in_segment: 5000000, // default 5M rows per segment - } - } - - pub async fn flush(&mut self) -> Result { - let schema = self.stream.as_ref().schema(); - let writer_props = WriterProperties::builder() - .set_compression(Compression::ZSTD(ZstdLevel::default())) - .set_encoding(Encoding::PLAIN) - .set_max_row_group_size(self.max_row_group_size) - .build(); - let mut total_rows = 0; - loop { - let mut buf = vec![]; - let mut arrow_writer = - ArrowWriter::try_new(&mut buf, schema.clone(), Some(writer_props.clone())) - .context(error::WriteParquetSnafu)?; - - let mut rows = 0; - let mut end_loop = true; - // TODO(hl & jiachun): Since OpenDAL's writer is async and ArrowWriter requires a `std::io::Write`, - // here we use a Vec to buffer all parquet bytes in memory and write to object store - // at a time. Maybe we should find a better way to bridge ArrowWriter and OpenDAL's object. - while let Some(batch) = self - .stream - .try_next() - .await - .context(error::PollStreamSnafu)? - { - arrow_writer - .write(&batch) - .context(error::WriteParquetSnafu)?; - rows += batch.num_rows(); - if rows >= self.max_rows_in_segment { - end_loop = false; - break; - } - } - - let start_row_num = total_rows + 1; - total_rows += rows; - arrow_writer.close().context(error::WriteParquetSnafu)?; - - // if rows == 0, we just end up with an empty file. - // - // file_name like: - // "file_name_1_1000000" (row num: 1 ~ 1000000), - // "file_name_1000001_xxx" (row num: 1000001 ~ xxx) - let file_name = format!("{}_{}_{}", self.file_name, start_row_num, total_rows); - self.object_store - .write(&file_name, buf) - .await - .context(error::WriteObjectSnafu { path: file_name })?; - - if end_loop { - return Ok(total_rows); - } - } + let rows_copied = writer + .write_sst(&storage::sst::WriteOptions::default()) + .await + .context(WriteParquetSnafu)? + .map(|SstInfo { num_rows, .. }| num_rows) + .unwrap_or(0); + + Ok(Output::AffectedRows(rows_copied)) } } diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index d39e8ff41c..7cfbb71211 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -16,7 +16,7 @@ pub use opendal::raw::normalize_path as raw_normalize_path; pub use opendal::raw::oio::Pager; pub use opendal::{ layers, services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, Metakey, - Operator as ObjectStore, Result, + Operator as ObjectStore, Result, Writer, }; pub mod cache_policy; diff --git a/src/storage/src/compaction/picker.rs b/src/storage/src/compaction/picker.rs index 966e4ed5af..3cba9e2a36 100644 --- a/src/storage/src/compaction/picker.rs +++ b/src/storage/src/compaction/picker.rs @@ -132,6 +132,7 @@ impl Picker for SimplePicker { wal: req.wal.clone(), manifest: req.manifest.clone(), expired_ssts, + sst_write_buffer_size: req.sst_write_buffer_size, })); } diff --git a/src/storage/src/compaction/scheduler.rs b/src/storage/src/compaction/scheduler.rs index a574d73050..2e20102dea 100644 --- a/src/storage/src/compaction/scheduler.rs +++ b/src/storage/src/compaction/scheduler.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use std::time::Duration; +use common_base::readable_size::ReadableSize; use common_telemetry::{debug, error, info}; use store_api::logstore::LogStore; use store_api::storage::RegionId; @@ -60,6 +61,8 @@ pub struct CompactionRequestImpl { pub ttl: Option, /// Compaction result sender. pub sender: Option>>, + + pub sst_write_buffer_size: ReadableSize, } impl CompactionRequestImpl { diff --git a/src/storage/src/compaction/task.rs b/src/storage/src/compaction/task.rs index b9be241a25..a238f6477c 100644 --- a/src/storage/src/compaction/task.rs +++ b/src/storage/src/compaction/task.rs @@ -15,6 +15,7 @@ use std::collections::HashSet; use std::fmt::{Debug, Formatter}; +use common_base::readable_size::ReadableSize; use common_telemetry::{error, info}; use store_api::logstore::LogStore; use store_api::storage::RegionId; @@ -46,6 +47,7 @@ pub struct CompactionTaskImpl { pub wal: Wal, pub manifest: RegionManifest, pub expired_ssts: Vec, + pub sst_write_buffer_size: ReadableSize, } impl Debug for CompactionTaskImpl { @@ -71,14 +73,14 @@ impl CompactionTaskImpl { for output in self.outputs.drain(..) { let schema = self.schema.clone(); let sst_layer = self.sst_layer.clone(); + let sst_write_buffer_size = self.sst_write_buffer_size; compacted_inputs.extend(output.inputs.iter().map(FileHandle::meta)); // TODO(hl): Maybe spawn to runtime to exploit in-job parallelism. futs.push(async move { - match output.build(region_id, schema, sst_layer).await { - Ok(meta) => Ok(meta), - Err(e) => Err(e), - } + output + .build(region_id, schema, sst_layer, sst_write_buffer_size) + .await }); } @@ -172,6 +174,7 @@ impl CompactionOutput { region_id: RegionId, schema: RegionSchemaRef, sst_layer: AccessLayerRef, + sst_write_buffer_size: ReadableSize, ) -> Result> { let reader = build_sst_reader( schema, @@ -183,7 +186,9 @@ impl CompactionOutput { .await?; let output_file_id = FileId::random(); - let opts = WriteOptions {}; + let opts = WriteOptions { + sst_write_buffer_size, + }; Ok(sst_layer .write_sst(output_file_id, Source::Reader(reader), &opts) @@ -192,6 +197,7 @@ impl CompactionOutput { |SstInfo { time_range, file_size, + .. }| FileMeta { region_id, file_id: output_file_id, diff --git a/src/storage/src/compaction/writer.rs b/src/storage/src/compaction/writer.rs index f30e38d941..4f809b4938 100644 --- a/src/storage/src/compaction/writer.rs +++ b/src/storage/src/compaction/writer.rs @@ -85,6 +85,7 @@ mod tests { use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; + use common_base::readable_size::ReadableSize; use common_test_util::temp_dir::create_temp_dir; use common_time::Timestamp; use datatypes::prelude::{LogicalTypeId, ScalarVector, ScalarVectorBuilder}; @@ -224,6 +225,7 @@ mod tests { let SstInfo { time_range, file_size, + .. } = writer .write_sst(&sst::WriteOptions::default()) .await @@ -411,7 +413,9 @@ mod tests { .await .unwrap(); - let opts = WriteOptions {}; + let opts = WriteOptions { + sst_write_buffer_size: ReadableSize::mb(8), + }; let s1 = ParquetWriter::new( &output_file_ids[0].as_parquet(), Source::Reader(reader1), diff --git a/src/storage/src/config.rs b/src/storage/src/config.rs index 51b3185ae6..1e74826d6d 100644 --- a/src/storage/src/config.rs +++ b/src/storage/src/config.rs @@ -16,12 +16,15 @@ use std::time::Duration; +use common_base::readable_size::ReadableSize; + #[derive(Debug, Clone)] pub struct EngineConfig { pub manifest_checkpoint_margin: Option, pub manifest_gc_duration: Option, pub max_files_in_l0: usize, pub max_purge_tasks: usize, + pub sst_write_buffer_size: ReadableSize, } impl Default for EngineConfig { @@ -31,6 +34,7 @@ impl Default for EngineConfig { manifest_gc_duration: Some(Duration::from_secs(30)), max_files_in_l0: 8, max_purge_tasks: 32, + sst_write_buffer_size: ReadableSize::mb(8), } } } diff --git a/src/storage/src/file_purger.rs b/src/storage/src/file_purger.rs index 7f5d47de18..791c350d27 100644 --- a/src/storage/src/file_purger.rs +++ b/src/storage/src/file_purger.rs @@ -147,7 +147,7 @@ mod tests { let sst_path = "table1"; let layer = Arc::new(FsAccessLayer::new(sst_path, os.clone())); let sst_info = layer - .write_sst(sst_file_id, Source::Iter(iter), &WriteOptions {}) + .write_sst(sst_file_id, Source::Iter(iter), &WriteOptions::default()) .await .unwrap() .unwrap(); diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index 0759b79d49..d4d87b2b74 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -23,6 +23,7 @@ use store_api::storage::consts::WRITE_ROW_GROUP_SIZE; use store_api::storage::SequenceNumber; use crate::background::{Context, Job, JobHandle, JobPoolRef}; +use crate::config::EngineConfig; use crate::error::{CancelledSnafu, Result}; use crate::manifest::action::*; use crate::manifest::region::RegionManifest; @@ -174,6 +175,8 @@ pub struct FlushJob { pub manifest: RegionManifest, /// Callbacks that get invoked on flush success. pub on_success: Option, + /// Storage engine config + pub engine_config: Arc, } impl FlushJob { @@ -190,6 +193,7 @@ impl FlushJob { batch_size: WRITE_ROW_GROUP_SIZE, ..Default::default() }; + for m in &self.memtables { // skip empty memtable if m.num_rows() == 0 { @@ -200,15 +204,18 @@ impl FlushJob { // TODO(hl): Check if random file name already exists in meta. let iter = m.iter(&iter_ctx)?; let sst_layer = self.sst_layer.clone(); - + let write_options = WriteOptions { + sst_write_buffer_size: self.engine_config.sst_write_buffer_size, + }; futures.push(async move { Ok(sst_layer - .write_sst(file_id, Source::Iter(iter), &WriteOptions::default()) + .write_sst(file_id, Source::Iter(iter), &write_options) .await? .map( |SstInfo { time_range, file_size, + .. }| FileMeta { region_id, file_id, diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index 85f00644da..0a42d66f32 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -31,7 +31,7 @@ pub mod region; pub mod scheduler; pub mod schema; mod snapshot; -mod sst; +pub mod sst; mod sync; #[cfg(test)] mod test_util; @@ -41,3 +41,6 @@ pub mod write_batch; pub use engine::EngineImpl; mod file_purger; + +pub use sst::parquet::ParquetWriter; +pub use sst::Source; diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index b5b1a3a08a..23a79753da 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use std::time::Duration; +use common_base::readable_size::ReadableSize; use common_error::prelude::BoxedError; use common_telemetry::tracing::log::info; use common_telemetry::{error, logging}; @@ -297,8 +298,10 @@ impl RegionWriter { let mut inner = self.inner.lock().await; ensure!(!inner.is_closed(), error::ClosedRegionSnafu); - - inner.manual_compact(writer_ctx, ctx).await + let sst_write_buffer_size = inner.engine_config.sst_write_buffer_size; + inner + .manual_compact(writer_ctx, ctx, sst_write_buffer_size) + .await } /// Cancel flush task if any @@ -648,6 +651,7 @@ impl WriterInner { wal: ctx.wal.clone(), manifest: ctx.manifest.clone(), on_success: cb, + engine_config: self.engine_config.clone(), }; let flush_handle = ctx @@ -663,6 +667,7 @@ impl WriterInner { &mut self, writer_ctx: WriterContext<'_, S>, compact_ctx: CompactContext, + sst_write_buffer_size: ReadableSize, ) -> Result<()> { let region_id = writer_ctx.shared.id(); let mut compaction_request = CompactionRequestImpl { @@ -674,6 +679,7 @@ impl WriterInner { wal: writer_ctx.wal.clone(), ttl: self.ttl, sender: None, + sst_write_buffer_size, }; let compaction_scheduler = writer_ctx.compaction_scheduler.clone(); @@ -730,6 +736,7 @@ impl WriterInner { wal: ctx.wal.clone(), ttl, sender: None, + sst_write_buffer_size: config.sst_write_buffer_size, }; let compaction_scheduler = ctx.compaction_scheduler.clone(); let shared_data = ctx.shared.clone(); diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index a4b9d1894e..3e56087a65 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -13,6 +13,7 @@ // limitations under the License. pub(crate) mod parquet; +mod stream_writer; use std::collections::HashMap; use std::fmt; @@ -21,9 +22,13 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use async_trait::async_trait; +use common_base::readable_size::ReadableSize; +use common_recordbatch::SendableRecordBatchStream; use common_telemetry::{error, info}; use common_time::range::TimestampRange; use common_time::Timestamp; +use datatypes::schema::SchemaRef; +use futures_util::StreamExt; use object_store::{util, ObjectStore}; use serde::{Deserialize, Deserializer, Serialize}; use snafu::{ResultExt, Snafu}; @@ -32,6 +37,7 @@ use table::predicate::Predicate; use uuid::Uuid; use crate::chunk::ChunkReaderImpl; +use crate::error; use crate::error::{DeleteSstSnafu, Result}; use crate::file_purger::{FilePurgeRequest, FilePurgerRef}; use crate::memtable::BoxedBatchIterator; @@ -45,6 +51,8 @@ pub const MAX_LEVEL: u8 = 2; pub type Level = u8; +pub use crate::sst::stream_writer::BufferedWriter; + // We only has fixed number of level, so we use array to hold elements. This implementation // detail of LevelMetaVec should not be exposed to the user of [LevelMetas]. type LevelMetaVec = [LevelMeta; MAX_LEVEL as usize]; @@ -383,9 +391,18 @@ where FileId::from_str(stripped).map_err(::custom) } -#[derive(Debug, Default)] +#[derive(Debug)] pub struct WriteOptions { // TODO(yingwen): [flush] row group size. + pub sst_write_buffer_size: ReadableSize, +} + +impl Default for WriteOptions { + fn default() -> Self { + Self { + sst_write_buffer_size: ReadableSize::mb(8), + } + } } pub struct ReadOptions { @@ -403,6 +420,7 @@ pub struct ReadOptions { pub struct SstInfo { pub time_range: Option<(Timestamp, Timestamp)>, pub file_size: u64, + pub num_rows: usize, } /// SST access layer. @@ -439,6 +457,8 @@ pub enum Source { Iter(BoxedBatchIterator), /// Writes row from ChunkReaderImpl (maybe a set of SSTs) to parquet. Reader(ChunkReaderImpl), + /// Record batch stream yielded by table scan + Stream(SendableRecordBatchStream), } impl Source { @@ -449,13 +469,23 @@ impl Source { .next_chunk() .await .map(|p| p.map(|chunk| Batch::new(chunk.columns))), + Source::Stream(stream) => stream + .next() + .await + .transpose() + .map(|r| r.map(|r| Batch::new(r.columns().to_vec()))) + .context(error::CreateRecordBatchSnafu), } } - fn projected_schema(&self) -> ProjectedSchemaRef { + fn schema(&self) -> SchemaRef { match self { - Source::Iter(iter) => iter.schema(), - Source::Reader(reader) => reader.projected_schema().clone(), + Source::Iter(iter) => { + let projected_schema = iter.schema(); + projected_schema.schema_to_read().schema().clone() + } + Source::Reader(reader) => reader.projected_schema().schema_to_read().schema().clone(), + Source::Stream(stream) => stream.schema(), } } } diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index acce5516ba..cfa64b81bb 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -27,7 +27,7 @@ use arrow_array::{ use async_compat::CompatExt; use async_stream::try_stream; use async_trait::async_trait; -use common_telemetry::error; +use common_telemetry::{error, warn}; use common_time::range::TimestampRange; use common_time::timestamp::TimeUnit; use common_time::Timestamp; @@ -38,7 +38,7 @@ use datatypes::prelude::ConcreteDataType; use futures_util::{Stream, StreamExt, TryStreamExt}; use object_store::ObjectStore; use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter}; -use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::basic::{Compression, Encoding, ZstdLevel}; use parquet::file::metadata::KeyValue; use parquet::file::properties::WriterProperties; @@ -48,15 +48,14 @@ use snafu::{OptionExt, ResultExt}; use table::predicate::Predicate; use tokio::io::BufReader; -use crate::error::{ - self, DecodeParquetTimeRangeSnafu, NewRecordBatchSnafu, ReadObjectSnafu, ReadParquetSnafu, - Result, WriteObjectSnafu, WriteParquetSnafu, -}; +use crate::error::{self, DecodeParquetTimeRangeSnafu, ReadObjectSnafu, ReadParquetSnafu, Result}; use crate::read::{Batch, BatchReader}; use crate::schema::compat::ReadAdapter; -use crate::schema::{ProjectedSchemaRef, StoreSchema, StoreSchemaRef}; +use crate::schema::{ProjectedSchemaRef, StoreSchema}; use crate::sst; +use crate::sst::stream_writer::BufferedWriter; use crate::sst::{FileHandle, Source, SstInfo}; + /// Parquet sst writer. pub struct ParquetWriter<'a> { file_path: &'a str, @@ -75,8 +74,8 @@ impl<'a> ParquetWriter<'a> { } } - pub async fn write_sst(self, _opts: &sst::WriteOptions) -> Result> { - self.write_rows(None).await + pub async fn write_sst(self, opts: &sst::WriteOptions) -> Result> { + self.write_rows(None, opts).await } /// Iterates memtable and writes rows to Parquet file. @@ -85,11 +84,9 @@ impl<'a> ParquetWriter<'a> { async fn write_rows( mut self, extra_meta: Option>, + opts: &sst::WriteOptions, ) -> Result> { - let projected_schema = self.source.projected_schema(); - let store_schema = projected_schema.schema_to_read(); - let schema = store_schema.arrow_schema().clone(); - + let schema = self.source.schema(); let writer_props = WriterProperties::builder() .set_compression(Compression::ZSTD(ZstdLevel::default())) .set_encoding(Encoding::PLAIN) @@ -101,62 +98,48 @@ impl<'a> ParquetWriter<'a> { })) .build(); - // TODO(hl): Since OpenDAL's writer is async and ArrowWriter requires a `std::io::Write`, - // here we use a Vec to buffer all parquet bytes in memory and write to object store - // at a time. Maybe we should find a better way to bridge ArrowWriter and OpenDAL's object. - let mut buf = vec![]; - let mut arrow_writer = ArrowWriter::try_new(&mut buf, schema.clone(), Some(writer_props)) - .context(WriteParquetSnafu)?; + let mut buffered_writer = BufferedWriter::try_new( + self.file_path.to_string(), + self.object_store.clone(), + &schema, + Some(writer_props), + opts.sst_write_buffer_size.as_bytes() as usize, + ) + .await?; + let mut rows_written = 0; - let mut batches_written = 0; while let Some(batch) = self.source.next_batch().await? { - let arrow_batch = RecordBatch::try_new( - schema.clone(), - batch - .columns() - .iter() - .map(|v| v.to_arrow_array()) - .collect::>(), - ) - .context(NewRecordBatchSnafu)?; - arrow_writer - .write(&arrow_batch) - .context(WriteParquetSnafu)?; - batches_written += 1; + buffered_writer.write(&batch).await?; + rows_written += batch.num_rows(); } - if batches_written == 0 { + if rows_written == 0 { // if the source does not contain any batch, we skip writing an empty parquet file. + if !buffered_writer.abort().await { + warn!( + "Partial file {} has been uploaded to remote storage", + self.file_path + ); + } return Ok(None); } - let file_meta = arrow_writer.close().context(WriteParquetSnafu)?; - let time_range = decode_timestamp_range(&file_meta, store_schema) - .ok() - .flatten(); + let (file_meta, file_size) = buffered_writer.close().await?; + let time_range = decode_timestamp_range(&file_meta, &schema).ok().flatten(); // object_store.write will make sure all bytes are written or an error is raised. - let buf_len = buf.len() as u64; - self.object_store - .write(self.file_path, buf) - .await - .context(WriteObjectSnafu { - path: self.file_path, - })?; - - let file_size = buf_len; Ok(Some(SstInfo { time_range, file_size, + num_rows: rows_written, })) } } fn decode_timestamp_range( file_meta: &FileMetaData, - store_schema: &StoreSchemaRef, + schema: &datatypes::schema::SchemaRef, ) -> Result> { - let schema = store_schema.schema(); let (Some(ts_col_idx), Some(ts_col)) = (schema.timestamp_index(), schema.timestamp_column()) else { return Ok(None); }; let ts_datatype = &ts_col.data_type; decode_timestamp_range_inner(file_meta, ts_col_idx, ts_datatype) @@ -574,6 +557,7 @@ mod tests { #[tokio::test] async fn test_parquet_writer() { + common_telemetry::init_default_ut_logging(); let schema = memtable_tests::schema_for_test(); let memtable = DefaultMemtableBuilder::default().build(schema); @@ -701,6 +685,7 @@ mod tests { let SstInfo { time_range, file_size, + .. } = writer .write_sst(&sst::WriteOptions::default()) .await @@ -793,6 +778,7 @@ mod tests { let SstInfo { time_range, file_size, + .. } = writer .write_sst(&sst::WriteOptions::default()) .await @@ -906,6 +892,7 @@ mod tests { let SstInfo { time_range, file_size, + .. } = writer .write_sst(&sst::WriteOptions::default()) .await diff --git a/src/storage/src/sst/stream_writer.rs b/src/storage/src/sst/stream_writer.rs new file mode 100644 index 0000000000..b58ba076b0 --- /dev/null +++ b/src/storage/src/sst/stream_writer.rs @@ -0,0 +1,198 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::Write; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; + +use arrow_array::RecordBatch; +use bytes::{BufMut, BytesMut}; +use datatypes::schema::SchemaRef; +use object_store::{ObjectStore, Writer}; +use parquet::arrow::ArrowWriter; +use parquet::file::properties::WriterProperties; +use parquet::format::FileMetaData; +use snafu::ResultExt; + +use crate::error; +use crate::error::{NewRecordBatchSnafu, WriteObjectSnafu, WriteParquetSnafu}; +use crate::read::Batch; + +#[derive(Clone, Default)] +struct Buffer { + // It's lightweight since writer/flusher never tries to contend this mutex. + buffer: Arc>, +} + +impl Buffer { + pub fn with_capacity(size: usize) -> Self { + Self { + buffer: Arc::new(Mutex::new(BytesMut::with_capacity(size))), + } + } +} + +impl Write for Buffer { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let len = buf.len(); + let mut buffer = self.buffer.lock().unwrap(); + buffer.put_slice(buf); + Ok(len) + } + + fn flush(&mut self) -> std::io::Result<()> { + // This flush implementation is intentionally left to blank. + // The actual flush is in `BufferedWriter::try_flush` + Ok(()) + } +} + +/// Parquet writer that buffers row groups in memory and writes buffered data to an underlying +/// storage by chunks to reduce memory consumption. +pub struct BufferedWriter { + path: String, + arrow_writer: ArrowWriter, + object_writer: Writer, + buffer: Buffer, + bytes_written: AtomicU64, + flushed: bool, + threshold: usize, + arrow_schema: arrow::datatypes::SchemaRef, +} + +impl BufferedWriter { + pub async fn try_new( + path: String, + store: ObjectStore, + schema: &SchemaRef, + props: Option, + buffer_threshold: usize, + ) -> error::Result { + let arrow_schema = schema.arrow_schema(); + let buffer = Buffer::with_capacity(buffer_threshold); + let writer = store + .writer(&path) + .await + .context(WriteObjectSnafu { path: &path })?; + + let arrow_writer = ArrowWriter::try_new(buffer.clone(), arrow_schema.clone(), props) + .context(WriteParquetSnafu)?; + + Ok(Self { + path, + arrow_writer, + object_writer: writer, + buffer, + bytes_written: Default::default(), + flushed: false, + threshold: buffer_threshold, + arrow_schema: arrow_schema.clone(), + }) + } + + /// Write a record batch to stream writer. + pub async fn write(&mut self, batch: &Batch) -> error::Result<()> { + let arrow_batch = RecordBatch::try_new( + self.arrow_schema.clone(), + batch + .columns() + .iter() + .map(|v| v.to_arrow_array()) + .collect::>(), + ) + .context(NewRecordBatchSnafu)?; + self.arrow_writer + .write(&arrow_batch) + .context(WriteParquetSnafu)?; + let written = Self::try_flush( + &self.path, + &self.buffer, + &mut self.object_writer, + false, + &mut self.flushed, + self.threshold, + ) + .await?; + self.bytes_written.fetch_add(written, Ordering::Relaxed); + Ok(()) + } + + /// Abort writer. + pub async fn abort(self) -> bool { + // TODO(hl): Currently we can do nothing if file's parts have been uploaded to remote storage + // on abortion, we need to find a way to abort the upload. see https://help.aliyun.com/document_detail/31996.htm?spm=a2c4g.11186623.0.0.3eb42cb7b2mwUz#reference-txp-bvx-wdb + !self.flushed + } + + /// Close parquet writer and ensure all buffered data are written into underlying storage. + pub async fn close(mut self) -> error::Result<(FileMetaData, u64)> { + let metadata = self.arrow_writer.close().context(WriteParquetSnafu)?; + let written = Self::try_flush( + &self.path, + &self.buffer, + &mut self.object_writer, + true, + &mut self.flushed, + self.threshold, + ) + .await?; + self.bytes_written.fetch_add(written, Ordering::Relaxed); + self.object_writer + .close() + .await + .context(WriteObjectSnafu { path: &self.path })?; + Ok((metadata, self.bytes_written.load(Ordering::Relaxed))) + } + + /// Try to flush buffered data to underlying storage if it's size exceeds threshold. + /// Set `all` to true if all buffered data should be flushed regardless of it's size. + async fn try_flush( + file_name: &str, + shared_buffer: &Buffer, + object_writer: &mut Writer, + all: bool, + flushed: &mut bool, + threshold: usize, + ) -> error::Result { + let mut bytes_written = 0; + + // Once buffered data size reaches threshold, split the data in chunks (typically 4MB) + // and write to underlying storage. + while shared_buffer.buffer.lock().unwrap().len() >= threshold { + let chunk = { + let mut buffer = shared_buffer.buffer.lock().unwrap(); + buffer.split_to(threshold) + }; + let size = chunk.len(); + object_writer + .append(chunk) + .await + .context(WriteObjectSnafu { path: file_name })?; + *flushed = true; + bytes_written += size; + } + + if all { + let remain = shared_buffer.buffer.lock().unwrap().split(); + let size = remain.len(); + object_writer + .append(remain) + .await + .context(WriteObjectSnafu { path: file_name })?; + *flushed = true; + bytes_written += size; + } + Ok(bytes_written as u64) + } +} diff --git a/tests/cases/standalone/copy/copy_from_fs.result b/tests/cases/standalone/copy/copy_from_fs.result index 0aae287308..0662e9c759 100644 --- a/tests/cases/standalone/copy/copy_from_fs.result +++ b/tests/cases/standalone/copy/copy_from_fs.result @@ -14,7 +14,7 @@ CREATE TABLE with_filename(host string, cpu double, memory double, ts timestamp Affected Rows: 0 -Copy with_filename FROM '/tmp/demo/export/demo.parquet_1_2'; +Copy with_filename FROM '/tmp/demo/export/demo.parquet'; Affected Rows: 2 diff --git a/tests/cases/standalone/copy/copy_from_fs.sql b/tests/cases/standalone/copy/copy_from_fs.sql index 4c6fc2910d..d7b924021e 100644 --- a/tests/cases/standalone/copy/copy_from_fs.sql +++ b/tests/cases/standalone/copy/copy_from_fs.sql @@ -6,7 +6,7 @@ Copy demo TO '/tmp/demo/export/demo.parquet'; CREATE TABLE with_filename(host string, cpu double, memory double, ts timestamp time index); -Copy with_filename FROM '/tmp/demo/export/demo.parquet_1_2'; +Copy with_filename FROM '/tmp/demo/export/demo.parquet'; select * from with_filename order by ts;