From f9abd256b7fe2252af55ff36ba4810fcfa217599 Mon Sep 17 00:00:00 2001 From: PSeitz Date: Thu, 19 Jan 2023 12:36:06 +0800 Subject: [PATCH] add ip addr to columnar (#1805) --- columnar/src/column/mod.rs | 5 +- columnar/src/column/serialize.rs | 51 +++++++- columnar/src/column_index/serialize.rs | 12 +- columnar/src/column_values/mod.rs | 17 ++- .../src/column_values/monotonic_mapping.rs | 28 ++--- columnar/src/column_values/serialize.rs | 38 +++--- columnar/src/columnar/column_type.rs | 13 ++ .../src/columnar/writer/column_operation.rs | 22 +++- columnar/src/columnar/writer/mod.rs | 116 +++++++++++++++++- columnar/src/dynamic_column.rs | 11 +- columnar/src/tests.rs | 31 +++++ 11 files changed, 285 insertions(+), 59 deletions(-) diff --git a/columnar/src/column/mod.rs b/columnar/src/column/mod.rs index 1c8830c08..ca31b7b22 100644 --- a/columnar/src/column/mod.rs +++ b/columnar/src/column/mod.rs @@ -6,7 +6,10 @@ use std::sync::Arc; use common::BinarySerializable; pub use dictionary_encoded::BytesColumn; -pub use serialize::{open_column_bytes, open_column_u64, serialize_column_u64}; +pub use serialize::{ + open_column_bytes, open_column_u128, open_column_u64, serialize_column_u128, + serialize_column_u64, +}; use crate::column_index::ColumnIndex; use crate::column_values::ColumnValues; diff --git a/columnar/src/column/serialize.rs b/columnar/src/column/serialize.rs index 79adce6d6..4c619cd3b 100644 --- a/columnar/src/column/serialize.rs +++ b/columnar/src/column/serialize.rs @@ -2,23 +2,43 @@ use std::io; use std::io::Write; use std::sync::Arc; -use common::{CountingWriter, OwnedBytes}; +use common::OwnedBytes; use sstable::Dictionary; use crate::column::{BytesColumn, Column}; use crate::column_index::{serialize_column_index, SerializableColumnIndex}; +use crate::column_values::serialize::serialize_column_values_u128; use crate::column_values::{ - serialize_column_values, ColumnValues, MonotonicallyMappableToU64, ALL_CODEC_TYPES, + serialize_column_values, ColumnValues, MonotonicallyMappableToU128, MonotonicallyMappableToU64, + ALL_CODEC_TYPES, }; + +pub fn serialize_column_u128< + F: Fn() -> I, + I: Iterator, + T: MonotonicallyMappableToU128, +>( + column_index: SerializableColumnIndex<'_>, + column_values: F, + num_vals: u32, + output: &mut impl Write, +) -> io::Result<()> { + let column_index_num_bytes = serialize_column_index(column_index, output)?; + serialize_column_values_u128( + || column_values().map(|val| val.to_u128()), + num_vals, + output, + )?; + output.write_all(&column_index_num_bytes.to_le_bytes())?; + Ok(()) +} + pub fn serialize_column_u64( column_index: SerializableColumnIndex<'_>, column_values: &impl ColumnValues, output: &mut impl Write, ) -> io::Result<()> { - let mut counting_writer = CountingWriter::wrap(output); - serialize_column_index(column_index, &mut counting_writer)?; - let column_index_num_bytes = counting_writer.written_bytes() as u32; - let output = counting_writer.finish(); + let column_index_num_bytes = serialize_column_index(column_index, output)?; serialize_column_values(column_values, &ALL_CODEC_TYPES[..], output)?; output.write_all(&column_index_num_bytes.to_le_bytes())?; Ok(()) @@ -41,6 +61,25 @@ pub fn open_column_u64(bytes: OwnedBytes) -> io:: }) } +pub fn open_column_u128( + bytes: OwnedBytes, +) -> io::Result> { + let (body, column_index_num_bytes_payload) = bytes.rsplit(4); + let column_index_num_bytes = u32::from_le_bytes( + column_index_num_bytes_payload + .as_slice() + .try_into() + .unwrap(), + ); + let (column_index_data, column_values_data) = body.split(column_index_num_bytes as usize); + let column_index = crate::column_index::open_column_index(column_index_data)?; + let column_values = crate::column_values::open_u128_mapped(column_values_data)?; + Ok(Column { + idx: column_index, + values: column_values, + }) +} + pub fn open_column_bytes(data: OwnedBytes) -> io::Result { let (body, dictionary_len_bytes) = data.rsplit(4); let dictionary_len = u32::from_le_bytes(dictionary_len_bytes.as_slice().try_into().unwrap()); diff --git a/columnar/src/column_index/serialize.rs b/columnar/src/column_index/serialize.rs index bc9168bf3..7a4b04539 100644 --- a/columnar/src/column_index/serialize.rs +++ b/columnar/src/column_index/serialize.rs @@ -1,7 +1,7 @@ use std::io; use std::io::Write; -use common::OwnedBytes; +use common::{CountingWriter, OwnedBytes}; use crate::column_index::multivalued_index::{serialize_multivalued_index, MultivaluedIndex}; use crate::column_index::optional_index::serialize_optional_index; @@ -29,19 +29,21 @@ impl<'a> SerializableColumnIndex<'a> { pub fn serialize_column_index( column_index: SerializableColumnIndex, output: &mut impl Write, -) -> io::Result<()> { +) -> io::Result { + let mut output = CountingWriter::wrap(output); let cardinality = column_index.get_cardinality().to_code(); output.write_all(&[cardinality])?; match column_index { SerializableColumnIndex::Full => {} SerializableColumnIndex::Optional(optional_index) => { - serialize_optional_index(&*optional_index, output)? + serialize_optional_index(&*optional_index, &mut output)? } SerializableColumnIndex::Multivalued(multivalued_index) => { - serialize_multivalued_index(multivalued_index, output)? + serialize_multivalued_index(multivalued_index, &mut output)? } } - Ok(()) + let column_index_num_bytes = output.written_bytes() as u32; + Ok(column_index_num_bytes) } pub fn open_column_index(mut bytes: OwnedBytes) -> io::Result> { diff --git a/columnar/src/column_values/mod.rs b/columnar/src/column_values/mod.rs index c6e793b3a..b0338c40c 100644 --- a/columnar/src/column_values/mod.rs +++ b/columnar/src/column_values/mod.rs @@ -28,7 +28,7 @@ mod compact_space; mod line; mod linear; pub(crate) mod monotonic_mapping; -// mod monotonic_mapping_u128; +pub(crate) mod monotonic_mapping_u128; mod column; mod column_with_cardinality; @@ -37,7 +37,7 @@ pub mod serialize; pub use self::column::{monotonic_map_column, ColumnValues, IterColumn, VecColumn}; pub use self::monotonic_mapping::{MonotonicallyMappableToU64, StrictlyMonotonicFn}; -// pub use self::monotonic_mapping_u128::MonotonicallyMappableToU128; +pub use self::monotonic_mapping_u128::MonotonicallyMappableToU128; pub use self::serialize::{serialize_and_load, serialize_column_values, NormalizedHeader}; use crate::column_values::bitpacked::BitpackedCodec; use crate::column_values::blockwise_linear::BlockwiseLinearCodec; @@ -136,6 +136,19 @@ impl U128FastFieldCodecType { // // Ok(Arc::new(monotonic_map_column(reader, inverted))) // } +/// Returns the correct codec reader wrapped in the `Arc` for the data. +pub fn open_u128_mapped( + mut bytes: OwnedBytes, +) -> io::Result>> { + let header = U128Header::deserialize(&mut bytes)?; + assert_eq!(header.codec_type, U128FastFieldCodecType::CompactSpace); + let reader = CompactSpaceDecompressor::open(bytes)?; + + let inverted: StrictlyMonotonicMappingInverter> = + StrictlyMonotonicMappingToInternal::::new().into(); + Ok(Arc::new(monotonic_map_column(reader, inverted))) +} + /// Returns the correct codec reader wrapped in the `Arc` for the data. pub fn open_u64_mapped( mut bytes: OwnedBytes, diff --git a/columnar/src/column_values/monotonic_mapping.rs b/columnar/src/column_values/monotonic_mapping.rs index 10bb27319..f44e9bd9e 100644 --- a/columnar/src/column_values/monotonic_mapping.rs +++ b/columnar/src/column_values/monotonic_mapping.rs @@ -2,6 +2,7 @@ use std::marker::PhantomData; use fastdivide::DividerU64; +use super::MonotonicallyMappableToU128; use crate::RowId; /// Monotonic maps a value to u64 value space. @@ -80,21 +81,20 @@ impl StrictlyMonotonicMappingToInternal { } } -// TODO -// impl -// StrictlyMonotonicFn for StrictlyMonotonicMappingToInternal -// where T: MonotonicallyMappableToU128 -// { -// #[inline(always)] -// fn mapping(&self, inp: External) -> u128 { -// External::to_u128(inp) -// } +impl + StrictlyMonotonicFn for StrictlyMonotonicMappingToInternal +where T: MonotonicallyMappableToU128 +{ + #[inline(always)] + fn mapping(&self, inp: External) -> u128 { + External::to_u128(inp) + } -// #[inline(always)] -// fn inverse(&self, out: u128) -> External { -// External::from_u128(out) -// } -// } + #[inline(always)] + fn inverse(&self, out: u128) -> External { + External::from_u128(out) + } +} impl StrictlyMonotonicFn for StrictlyMonotonicMappingToInternal diff --git a/columnar/src/column_values/serialize.rs b/columnar/src/column_values/serialize.rs index 57c108280..d086e08f3 100644 --- a/columnar/src/column_values/serialize.rs +++ b/columnar/src/column_values/serialize.rs @@ -35,6 +35,7 @@ use super::{ monotonic_map_column, ColumnValues, FastFieldCodec, FastFieldCodecType, MonotonicallyMappableToU64, U128FastFieldCodecType, VecColumn, ALL_CODEC_TYPES, }; +use crate::column_values::compact_space::CompactSpaceCompressor; /// The normalized header gives some parameters after applying the following /// normalization of the vector: @@ -182,32 +183,23 @@ pub(crate) fn estimate( } } -// TODO /// Serializes u128 values with the compact space codec. -// pub fn serialize_u128_new I, I: Iterator>( -// value_index: ColumnIndex, -// iter_gen: F, -// num_vals: u32, -// output: &mut impl io::Write, -// ) -> io::Result<()> { -// let header = U128Header { -// num_vals, -// codec_type: U128FastFieldCodecType::CompactSpace, -// }; -// header.serialize(output)?; -// let compressor = CompactSpaceCompressor::train_from(iter_gen(), num_vals); -// compressor.compress_into(iter_gen(), output).unwrap(); +pub fn serialize_column_values_u128 I, I: Iterator>( + iter_gen: F, + num_vals: u32, + output: &mut impl io::Write, +) -> io::Result<()> { + let header = U128Header { + num_vals, + codec_type: U128FastFieldCodecType::CompactSpace, + }; + header.serialize(output)?; -// let null_index_footer = ColumnFooter { -// cardinality: value_index.get_cardinality(), -// null_index_codec: NullIndexCodec::Full, -// null_index_byte_range: 0..0, -// }; -// append_null_index_footer(output, null_index_footer)?; -// append_format_version(output)?; + let compressor = CompactSpaceCompressor::train_from(iter_gen(), num_vals); + compressor.compress_into(iter_gen(), output)?; -// Ok(()) -// } + Ok(()) +} /// Serializes the column with the codec with the best estimate on the data. pub fn serialize_column_values( diff --git a/columnar/src/columnar/column_type.rs b/columnar/src/columnar/column_type.rs index 6d851ed38..7b7c755c7 100644 --- a/columnar/src/columnar/column_type.rs +++ b/columnar/src/columnar/column_type.rs @@ -11,6 +11,7 @@ pub enum ColumnType { Bytes, Numerical(NumericalType), Bool, + IpAddr, } impl ColumnType { @@ -31,6 +32,10 @@ impl ColumnType { column_type_category = ColumnTypeCategory::Bool; numerical_type_code = 0u8; } + ColumnType::IpAddr => { + column_type_category = ColumnTypeCategory::IpAddr; + numerical_type_code = 0u8; + } } place_bits::<0, 3>(column_type_category.to_code()) | place_bits::<3, 6>(numerical_type_code) } @@ -49,6 +54,12 @@ impl ColumnType { } Ok(ColumnType::Bool) } + ColumnTypeCategory::IpAddr => { + if numerical_type_code != 0u8 { + return Err(InvalidData); + } + Ok(ColumnType::IpAddr) + } ColumnTypeCategory::Str => { if numerical_type_code != 0u8 { return Err(InvalidData); @@ -76,6 +87,7 @@ pub(crate) enum ColumnTypeCategory { Bool = 0u8, Str = 1u8, Numerical = 2u8, + IpAddr = 3u8, } impl ColumnTypeCategory { @@ -88,6 +100,7 @@ impl ColumnTypeCategory { 0u8 => Ok(Self::Bool), 1u8 => Ok(Self::Str), 2u8 => Ok(Self::Numerical), + 3u8 => Ok(Self::IpAddr), _ => Err(InvalidData), } } diff --git a/columnar/src/columnar/writer/column_operation.rs b/columnar/src/columnar/writer/column_operation.rs index 596bcf06e..c9d8821e7 100644 --- a/columnar/src/columnar/writer/column_operation.rs +++ b/columnar/src/columnar/writer/column_operation.rs @@ -1,3 +1,5 @@ +use std::net::Ipv6Addr; + use crate::dictionary::UnorderedId; use crate::utils::{place_bits, pop_first_byte, select_bits}; use crate::value::NumericalValue; @@ -25,12 +27,12 @@ struct ColumnOperationMetadata { impl ColumnOperationMetadata { fn to_code(self) -> u8 { - place_bits::<0, 4>(self.len) | place_bits::<4, 8>(self.op_type.to_code()) + place_bits::<0, 6>(self.len) | place_bits::<6, 8>(self.op_type.to_code()) } fn try_from_code(code: u8) -> Result { - let len = select_bits::<0, 4>(code); - let typ_code = select_bits::<4, 8>(code); + let len = select_bits::<0, 6>(code); + let typ_code = select_bits::<6, 8>(code); let column_type = ColumnOperationType::try_from_code(typ_code)?; Ok(ColumnOperationMetadata { op_type: column_type, @@ -142,9 +144,21 @@ impl SymbolValue for bool { } } +impl SymbolValue for Ipv6Addr { + fn serialize(self, buffer: &mut [u8]) -> u8 { + buffer[0..16].copy_from_slice(&self.octets()); + 16 + } + + fn deserialize(bytes: &[u8]) -> Self { + let octets: [u8; 16] = bytes[0..16].try_into().unwrap(); + Ipv6Addr::from(octets) + } +} + #[derive(Default)] struct MiniBuffer { - pub bytes: [u8; 10], + pub bytes: [u8; 17], pub len: u8, } diff --git a/columnar/src/columnar/writer/mod.rs b/columnar/src/columnar/writer/mod.rs index c3522448b..1e6afcbdf 100644 --- a/columnar/src/columnar/writer/mod.rs +++ b/columnar/src/columnar/writer/mod.rs @@ -4,6 +4,7 @@ mod serializer; mod value_index; use std::io; +use std::net::Ipv6Addr; use column_operation::ColumnOperation; use common::CountingWriter; @@ -11,7 +12,9 @@ use serializer::ColumnarSerializer; use stacker::{Addr, ArenaHashMap, MemoryArena}; use crate::column_index::SerializableColumnIndex; -use crate::column_values::{ColumnValues, MonotonicallyMappableToU64, VecColumn}; +use crate::column_values::{ + ColumnValues, MonotonicallyMappableToU128, MonotonicallyMappableToU64, VecColumn, +}; use crate::columnar::column_type::{ColumnType, ColumnTypeCategory}; use crate::columnar::writer::column_writers::{ ColumnWriter, NumericalColumnWriter, StrColumnWriter, @@ -30,6 +33,7 @@ struct SpareBuffers { u64_values: Vec, f64_values: Vec, bool_values: Vec, + ip_addr_values: Vec, } /// Makes it possible to create a new columnar. @@ -48,6 +52,7 @@ struct SpareBuffers { pub struct ColumnarWriter { numerical_field_hash_map: ArenaHashMap, bool_field_hash_map: ArenaHashMap, + ip_addr_field_hash_map: ArenaHashMap, bytes_field_hash_map: ArenaHashMap, arena: MemoryArena, // Dictionaries used to store dictionary-encoded values. @@ -60,6 +65,7 @@ impl Default for ColumnarWriter { ColumnarWriter { numerical_field_hash_map: ArenaHashMap::new(10_000), bool_field_hash_map: ArenaHashMap::new(10_000), + ip_addr_field_hash_map: ArenaHashMap::new(10_000), bytes_field_hash_map: ArenaHashMap::new(10_000), dictionaries: Vec::new(), arena: MemoryArena::default(), @@ -90,6 +96,22 @@ impl ColumnarWriter { ); } + pub fn record_ip_addr(&mut self, doc: RowId, column_name: &str, ip_addr: Ipv6Addr) { + assert!( + !column_name.as_bytes().contains(&0u8), + "key may not contain the 0 byte" + ); + let (hash_map, arena) = (&mut self.ip_addr_field_hash_map, &mut self.arena); + hash_map.mutate_or_create( + column_name.as_bytes(), + |column_opt: Option| { + let mut column: ColumnWriter = column_opt.unwrap_or_default(); + column.record(doc, ip_addr, arena); + column + }, + ); + } + pub fn record_bool(&mut self, doc: RowId, column_name: &str, val: bool) { assert!( !column_name.as_bytes().contains(&0u8), @@ -148,9 +170,16 @@ impl ColumnarWriter { .iter() .map(|(term, addr, _)| (term, ColumnTypeCategory::Bool, addr)), ); + field_columns.extend( + self.ip_addr_field_hash_map + .iter() + .map(|(term, addr, _)| (term, ColumnTypeCategory::IpAddr, addr)), + ); + field_columns.sort_unstable_by_key(|(column_name, col_type, _)| (*column_name, *col_type)); let (arena, buffers, dictionaries) = (&self.arena, &mut self.buffers, &self.dictionaries); let mut symbol_byte_buffer: Vec = Vec::new(); + for (column_name, bytes_or_numerical, addr) in field_columns { match bytes_or_numerical { ColumnTypeCategory::Bool => { @@ -166,6 +195,19 @@ impl ColumnarWriter { &mut column_serializer, )?; } + ColumnTypeCategory::IpAddr => { + let column_writer: ColumnWriter = self.ip_addr_field_hash_map.read(addr); + let cardinality = column_writer.get_cardinality(num_docs); + let mut column_serializer = + serializer.serialize_column(column_name, ColumnType::IpAddr); + serialize_ip_addr_column( + cardinality, + num_docs, + column_writer.operation_iterator(arena, &mut symbol_byte_buffer), + buffers, + &mut column_serializer, + )?; + } ColumnTypeCategory::Str => { let str_column_writer: StrColumnWriter = self.bytes_field_hash_map.read(addr); let dictionary_builder = @@ -317,6 +359,76 @@ fn serialize_bool_column( Ok(()) } +fn serialize_ip_addr_column( + cardinality: Cardinality, + num_docs: RowId, + column_operations_it: impl Iterator>, + buffers: &mut SpareBuffers, + wrt: &mut impl io::Write, +) -> io::Result<()> { + let SpareBuffers { + value_index_builders, + ip_addr_values, + .. + } = buffers; + serialize_column_u128( + column_operations_it, + cardinality, + num_docs, + value_index_builders, + ip_addr_values, + wrt, + )?; + Ok(()) +} + +fn serialize_column_u128< + T: Copy + std::fmt::Debug + Send + Sync + MonotonicallyMappableToU128 + PartialOrd, +>( + op_iterator: impl Iterator>, + cardinality: Cardinality, + num_docs: RowId, + value_index_builders: &mut PreallocatedIndexBuilders, + values: &mut Vec, + mut wrt: impl io::Write, +) -> io::Result<()> +where + for<'a> VecColumn<'a, T>: ColumnValues, +{ + values.clear(); + // TODO: split index and values + let serializable_column_index = match cardinality { + Cardinality::Full => { + consume_operation_iterator( + op_iterator, + value_index_builders.borrow_required_index_builder(), + values, + ); + SerializableColumnIndex::Full + } + Cardinality::Optional => { + let optional_index_builder = value_index_builders.borrow_optional_index_builder(); + consume_operation_iterator(op_iterator, optional_index_builder, values); + let optional_index = optional_index_builder.finish(num_docs); + SerializableColumnIndex::Optional(Box::new(optional_index)) + } + Cardinality::Multivalued => { + let multivalued_index_builder = value_index_builders.borrow_multivalued_index_builder(); + consume_operation_iterator(op_iterator, multivalued_index_builder, values); + let _multivalued_index = multivalued_index_builder.finish(num_docs); + todo!(); + // SerializableColumnIndex::Multivalued(Box::new(multivalued_index)) + } + }; + crate::column::serialize_column_u128( + serializable_column_index, + || values.iter().cloned(), + values.len() as u32, + &mut wrt, + )?; + Ok(()) +} + fn serialize_column< T: Copy + Default + std::fmt::Debug + Send + Sync + MonotonicallyMappableToU64 + PartialOrd, >( @@ -349,7 +461,7 @@ where Cardinality::Multivalued => { let multivalued_index_builder = value_index_builders.borrow_multivalued_index_builder(); consume_operation_iterator(op_iterator, multivalued_index_builder, values); - let multivalued_index = multivalued_index_builder.finish(num_docs); + let _multivalued_index = multivalued_index_builder.finish(num_docs); todo!(); // SerializableColumnIndex::Multivalued(Box::new(multivalued_index)) } diff --git a/columnar/src/dynamic_column.rs b/columnar/src/dynamic_column.rs index fdfb7ad64..940ec3dbc 100644 --- a/columnar/src/dynamic_column.rs +++ b/columnar/src/dynamic_column.rs @@ -1,5 +1,5 @@ use std::io; -use std::net::IpAddr; +use std::net::Ipv6Addr; use common::file_slice::FileSlice; use common::{HasLen, OwnedBytes}; @@ -14,7 +14,7 @@ pub enum DynamicColumn { I64(Column), U64(Column), F64(Column), - IpAddr(Column), + IpAddr(Column), DateTime(Column), Str(BytesColumn), } @@ -49,6 +49,12 @@ impl From for DynamicColumn { } } +impl From> for DynamicColumn { + fn from(column: Column) -> Self { + DynamicColumn::IpAddr(column) + } +} + #[derive(Clone)] pub struct DynamicColumnHandle { pub(crate) file_slice: FileSlice, @@ -81,6 +87,7 @@ impl DynamicColumnHandle { } }, ColumnType::Bool => crate::column::open_column_u64::(column_bytes)?.into(), + ColumnType::IpAddr => crate::column::open_column_u128::(column_bytes)?.into(), }; Ok(dynamic_column) } diff --git a/columnar/src/tests.rs b/columnar/src/tests.rs index 45f1a1898..70a4ec34b 100644 --- a/columnar/src/tests.rs +++ b/columnar/src/tests.rs @@ -1,3 +1,6 @@ +use std::net::{IpAddr, Ipv6Addr}; + +use crate::column_values::MonotonicallyMappableToU128; use crate::columnar::ColumnType; use crate::dynamic_column::{DynamicColumn, DynamicColumnHandle}; use crate::value::NumericalValue; @@ -36,6 +39,34 @@ fn test_dataframe_writer_bool() { assert_eq!(&vals, &[None, Some(false), None, Some(true), None,]); } +#[test] +fn test_dataframe_writer_ip_addr() { + let mut dataframe_writer = ColumnarWriter::default(); + dataframe_writer.record_ip_addr(1, "ip_addr", Ipv6Addr::from_u128(1001)); + dataframe_writer.record_ip_addr(3, "ip_addr", Ipv6Addr::from_u128(1050)); + let mut buffer: Vec = Vec::new(); + dataframe_writer.serialize(5, &mut buffer).unwrap(); + let columnar = ColumnarReader::open(buffer).unwrap(); + assert_eq!(columnar.num_columns(), 1); + let cols: Vec = columnar.read_columns("ip_addr").unwrap(); + assert_eq!(cols.len(), 1); + // assert_eq!(cols[0].num_bytes(), 29); + assert_eq!(cols[0].column_type(), ColumnType::IpAddr); + let dyn_bool_col = cols[0].open().unwrap(); + let DynamicColumn::IpAddr(ip_col) = dyn_bool_col else { panic!(); }; + let vals: Vec> = (0..5).map(|row_id| ip_col.first(row_id)).collect(); + assert_eq!( + &vals, + &[ + None, + Some(Ipv6Addr::from_u128(1001)), + None, + Some(Ipv6Addr::from_u128(1050)), + None, + ] + ); +} + #[test] fn test_dataframe_writer_numerical() { let mut dataframe_writer = ColumnarWriter::default();