From f02f89577f8f9cb71b0a512496c95027ed6df296 Mon Sep 17 00:00:00 2001 From: Shawn Koschik Date: Thu, 30 Oct 2025 19:23:41 -0400 Subject: [PATCH 1/5] feat(s3): add to_deltalake_streaming for single-commit Delta writes --- awswrangler/s3/__init__.py | 3 +- awswrangler/s3/_write_deltalake.py | 121 ++++++++++++++++++++++++++++- 2 files changed, 122 insertions(+), 2 deletions(-) diff --git a/awswrangler/s3/__init__.py b/awswrangler/s3/__init__.py index 3c53e0034..30b8320f2 100644 --- a/awswrangler/s3/__init__.py +++ b/awswrangler/s3/__init__.py @@ -13,7 +13,7 @@ from awswrangler.s3._select import select_query from awswrangler.s3._upload import upload from awswrangler.s3._wait import wait_objects_exist, wait_objects_not_exist -from awswrangler.s3._write_deltalake import to_deltalake +from awswrangler.s3._write_deltalake import to_deltalake, to_deltalake_streaming from awswrangler.s3._write_excel import to_excel from awswrangler.s3._write_orc import to_orc from awswrangler.s3._write_parquet import store_parquet_metadata, to_parquet @@ -49,6 +49,7 @@ "to_csv", "to_json", "to_deltalake", + "to_deltalake_streaming", "to_excel", "read_excel", "download", diff --git a/awswrangler/s3/_write_deltalake.py b/awswrangler/s3/_write_deltalake.py index bc5e23e26..b0034936d 100644 --- a/awswrangler/s3/_write_deltalake.py +++ b/awswrangler/s3/_write_deltalake.py @@ -2,7 +2,7 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, Literal +from typing import TYPE_CHECKING, Any, Iterable, Iterator, Literal import boto3 import pandas as pd @@ -30,6 +30,7 @@ def _set_default_storage_options_kwargs( defaults = {key.upper(): value for key, value in _utils.boto3_to_primitives(boto3_session=boto3_session).items()} defaults["AWS_REGION"] = defaults.pop("REGION_NAME") defaults["AWS_SESSION_TOKEN"] = "" if defaults["AWS_SESSION_TOKEN"] is None else defaults["AWS_SESSION_TOKEN"] + s3_additional_kwargs = s3_additional_kwargs or {} s3_lock_arguments = {} @@ -133,3 +134,121 @@ def to_deltalake( schema_mode=schema_mode, storage_options=storage_options, ) + + + +def _df_iter_to_record_batch_reader( + df_iter: Iterable[pd.DataFrame], + *, + index: bool, + dtype: dict[str, str], + target_schema: pa.Schema | None = None, + batch_size: int | None = None, +) -> tuple[pa.RecordBatchReader, pa.Schema]: + """ + Convert an iterable of Pandas DataFrames into a single Arrow RecordBatchReader + suitable for a single delta-rs commit. The first *non-empty* DataFrame fixes the schema. + + Returns + ------- + (reader, schema) + reader: pa.RecordBatchReader streaming all chunks as Arrow batches + schema: pa.Schema used for conversion + """ + it = iter(df_iter) + + first_df: pd.DataFrame | None = None + for df in it: + if not df.empty: + first_df = df + break + + if first_df is None: + empty_schema = pa.schema([]) + empty_reader = pa.RecordBatchReader.from_batches(empty_schema, []) + return empty_reader, empty_schema + + schema = target_schema or _data_types.pyarrow_schema_from_pandas( + df=first_df, index=index, ignore_cols=None, dtype=dtype + ) + + def batches() -> Iterator[pa.RecordBatch]: + first_tbl: pa.Table = _df_to_table(first_df, schema, index, dtype) + for b in (first_tbl.to_batches(batch_size) if batch_size is not None else first_tbl.to_batches()): + yield b + + for df in it: + if df.empty: + continue + tbl: pa.Table = _df_to_table(df, schema, index, dtype) + for b in (tbl.to_batches(batch_size) if batch_size is not None else tbl.to_batches()): + yield b + + reader = pa.RecordBatchReader.from_batches(schema, batches()) + return reader, schema + + +@_utils.check_optional_dependency(deltalake, "deltalake") +@Experimental +def to_deltalake_streaming( + *, + dfs: Iterable[pd.DataFrame], + path: str, + index: bool = False, + mode: Literal["error", "append", "overwrite", "ignore"] = "append", + dtype: dict[str, str] | None = None, + partition_cols: list[str] | None = None, + schema_mode: Literal["overwrite", "merge"] | None = None, + lock_dynamodb_table: str | None = None, + s3_allow_unsafe_rename: bool = False, + boto3_session: boto3.Session | None = None, + s3_additional_kwargs: dict[str, str] | None = None, + batch_size: int | None = None, + max_open_files: int | None = None, + max_rows_per_file: int | None = None, + target_file_size: int | None = None, +) -> None: + """ + Write an iterable/generator of Pandas DataFrames to S3 as a Delta Lake table + in a SINGLE atomic commit (one table version). + + Use this for large "restatements" that are produced in chunks. Semantics mirror + `to_deltalake` (partitioning, schema handling, S3 locking, etc.). + + Notes + ----- + - The schema is fixed by the first *non-empty* chunk (plus any `dtype` coercions). + - All `partition_cols` must be present in every non-empty chunk. + - Prefer `lock_dynamodb_table` over `s3_allow_unsafe_rename=True` on S3. + """ + dtype = dtype or {} + + storage_options = _set_default_storage_options_kwargs( + boto3_session=boto3_session, + s3_additional_kwargs=s3_additional_kwargs, + s3_allow_unsafe_rename=s3_allow_unsafe_rename, + lock_dynamodb_table=lock_dynamodb_table, + ) + + reader, schema = _df_iter_to_record_batch_reader( + df_iter=dfs, + index=index, + dtype=dtype, + target_schema=None, + batch_size=batch_size, + ) + + if len(schema) == 0: + return + + deltalake.write_deltalake( + table_or_uri=path, + data=reader, + partition_by=partition_cols, + mode=mode, + schema_mode=schema_mode, + storage_options=storage_options, + max_open_files=max_open_files, + max_rows_per_file=max_rows_per_file, + target_file_size=target_file_size, + ) \ No newline at end of file From ec1bc7ef6a00c235af948bf48f3ac01ed0768af7 Mon Sep 17 00:00:00 2001 From: Shawn Koschik Date: Fri, 31 Oct 2025 02:18:48 -0400 Subject: [PATCH 2/5] formatting --- awswrangler/s3/_write_deltalake.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/awswrangler/s3/_write_deltalake.py b/awswrangler/s3/_write_deltalake.py index b0034936d..38ab134e4 100644 --- a/awswrangler/s3/_write_deltalake.py +++ b/awswrangler/s3/_write_deltalake.py @@ -136,7 +136,6 @@ def to_deltalake( ) - def _df_iter_to_record_batch_reader( df_iter: Iterable[pd.DataFrame], *, @@ -174,14 +173,14 @@ def _df_iter_to_record_batch_reader( def batches() -> Iterator[pa.RecordBatch]: first_tbl: pa.Table = _df_to_table(first_df, schema, index, dtype) - for b in (first_tbl.to_batches(batch_size) if batch_size is not None else first_tbl.to_batches()): + for b in first_tbl.to_batches(batch_size) if batch_size is not None else first_tbl.to_batches(): yield b for df in it: if df.empty: continue tbl: pa.Table = _df_to_table(df, schema, index, dtype) - for b in (tbl.to_batches(batch_size) if batch_size is not None else tbl.to_batches()): + for b in tbl.to_batches(batch_size) if batch_size is not None else tbl.to_batches(): yield b reader = pa.RecordBatchReader.from_batches(schema, batches()) @@ -251,4 +250,4 @@ def to_deltalake_streaming( max_open_files=max_open_files, max_rows_per_file=max_rows_per_file, target_file_size=target_file_size, - ) \ No newline at end of file + ) From 530620f658e3f433437639f4d1d7ac9bda70407f Mon Sep 17 00:00:00 2001 From: Shawn Koschik Date: Fri, 31 Oct 2025 02:30:51 -0400 Subject: [PATCH 3/5] remove docstrings --- awswrangler/s3/_write_deltalake.py | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/awswrangler/s3/_write_deltalake.py b/awswrangler/s3/_write_deltalake.py index 38ab134e4..cfb76056e 100644 --- a/awswrangler/s3/_write_deltalake.py +++ b/awswrangler/s3/_write_deltalake.py @@ -144,16 +144,6 @@ def _df_iter_to_record_batch_reader( target_schema: pa.Schema | None = None, batch_size: int | None = None, ) -> tuple[pa.RecordBatchReader, pa.Schema]: - """ - Convert an iterable of Pandas DataFrames into a single Arrow RecordBatchReader - suitable for a single delta-rs commit. The first *non-empty* DataFrame fixes the schema. - - Returns - ------- - (reader, schema) - reader: pa.RecordBatchReader streaming all chunks as Arrow batches - schema: pa.Schema used for conversion - """ it = iter(df_iter) first_df: pd.DataFrame | None = None @@ -207,19 +197,6 @@ def to_deltalake_streaming( max_rows_per_file: int | None = None, target_file_size: int | None = None, ) -> None: - """ - Write an iterable/generator of Pandas DataFrames to S3 as a Delta Lake table - in a SINGLE atomic commit (one table version). - - Use this for large "restatements" that are produced in chunks. Semantics mirror - `to_deltalake` (partitioning, schema handling, S3 locking, etc.). - - Notes - ----- - - The schema is fixed by the first *non-empty* chunk (plus any `dtype` coercions). - - All `partition_cols` must be present in every non-empty chunk. - - Prefer `lock_dynamodb_table` over `s3_allow_unsafe_rename=True` on S3. - """ dtype = dtype or {} storage_options = _set_default_storage_options_kwargs( From 6a8cc81cbcc71767da1677b937f7aed0cc8a82c0 Mon Sep 17 00:00:00 2001 From: Shawn Koschik Date: Thu, 6 Nov 2025 13:29:02 -0500 Subject: [PATCH 4/5] fix: not supported --- awswrangler/s3/_write_deltalake.py | 4 - tests/unit/test_s3_deltalake.py | 176 ++++++++++++++++++++++++++++- 2 files changed, 175 insertions(+), 5 deletions(-) diff --git a/awswrangler/s3/_write_deltalake.py b/awswrangler/s3/_write_deltalake.py index cfb76056e..a60318fa8 100644 --- a/awswrangler/s3/_write_deltalake.py +++ b/awswrangler/s3/_write_deltalake.py @@ -193,8 +193,6 @@ def to_deltalake_streaming( boto3_session: boto3.Session | None = None, s3_additional_kwargs: dict[str, str] | None = None, batch_size: int | None = None, - max_open_files: int | None = None, - max_rows_per_file: int | None = None, target_file_size: int | None = None, ) -> None: dtype = dtype or {} @@ -224,7 +222,5 @@ def to_deltalake_streaming( mode=mode, schema_mode=schema_mode, storage_options=storage_options, - max_open_files=max_open_files, - max_rows_per_file=max_rows_per_file, target_file_size=target_file_size, ) diff --git a/tests/unit/test_s3_deltalake.py b/tests/unit/test_s3_deltalake.py index 780738892..b7c488272 100644 --- a/tests/unit/test_s3_deltalake.py +++ b/tests/unit/test_s3_deltalake.py @@ -1,24 +1,36 @@ from __future__ import annotations -from typing import Any, Iterator +from typing import Any, Iterator, Iterable import boto3 +import pyarrow as pa import pytest +from pandas.testing import assert_frame_equal import awswrangler as wr import awswrangler.pandas as pd +from awswrangler.s3._write_deltalake import _df_iter_to_record_batch_reader from .._utils import ( get_time_str_with_random_suffix, ) +def assert_df_equal_unordered(left: pd.DataFrame, right: pd.DataFrame, by: list[str]) -> None: + """Compare two dataframes ignoring row order and dtypes.""" + l2 = left.sort_values(by).reset_index(drop=True) + r2 = right.sort_values(by).reset_index(drop=True) + + assert_frame_equal(l2, r2, check_dtype=False, check_like=True) + + @pytest.fixture(scope="session") def lock_dynamodb_table() -> Iterator[str]: name = f"deltalake_lock_{get_time_str_with_random_suffix()}" print(f"Table name: {name}") dynamodb_client = boto3.client("dynamodb") + dynamodb_client.create_table( TableName=name, BillingMode="PAY_PER_REQUEST", @@ -94,3 +106,165 @@ def test_read_deltalake_partitions(path: str, lock_settings: dict[str, Any]) -> df2 = wr.s3.read_deltalake(path=path, columns=["c0"], partitions=[("par0", "=", "foo"), ("par1", "=", "1")]) assert df2.shape == (1, 1) + + +@pytest.mark.parametrize("chunksize", [2, 10]) +def test_to_deltalake_streaming_single_commit_overwrite( + path: str, + lock_settings: dict[str, Any], + chunksize: int, +) -> None: + df1 = pd.DataFrame({"c0": [1, 1], "c1": [10, 11], "v": [100, 200]}) + df2 = pd.DataFrame({"c0": [2, 2], "c1": [12, 13], "v": [300, 400]}) + + def dfs() -> Iterable[pd.DataFrame]: + yield df1 + yield df2 + + wr.s3.to_deltalake_streaming( + dfs=dfs(), + path=path, + mode="overwrite", + partition_cols=["c0", "c1"], + **lock_settings, + ) + + out = wr.s3.read_deltalake(path=path) + + expected = pd.concat([df1, df2], ignore_index=True) + assert_df_equal_unordered(expected, out, by=["c0", "c1", "v"]) + + +def test_to_deltalake_streaming_creates_one_version_per_run( + path: str, + lock_settings: dict[str, Any], +) -> None: + df_run1_a = pd.DataFrame({"c0": [1], "c1": [10], "v": [111]}) + df_run1_b = pd.DataFrame({"c0": [1], "c1": [11], "v": [112]}) + + wr.s3.to_deltalake_streaming( + dfs=[df_run1_a, df_run1_b], + path=path, + mode="overwrite", + partition_cols=["c0", "c1"], + **lock_settings, + ) + + run1_expected = pd.concat([df_run1_a, df_run1_b], ignore_index=True) + latest_v0 = wr.s3.read_deltalake(path=path) + assert_df_equal_unordered(run1_expected, latest_v0, by=["c0", "c1", "v"]) + + df_run2_a = pd.DataFrame({"c0": [2], "c1": [12], "v": [221]}) + df_run2_b = pd.DataFrame({"c0": [2], "c1": [13], "v": [222]}) + + wr.s3.to_deltalake_streaming( + dfs=[df_run2_a, df_run2_b], + path=path, + mode="overwrite", + partition_cols=["c0", "c1"], + **lock_settings, + ) + + v0 = wr.s3.read_deltalake(path=path, version=0) + v1 = wr.s3.read_deltalake(path=path, version=1) + run2_expected = pd.concat([df_run2_a, df_run2_b], ignore_index=True) + + assert_df_equal_unordered(run1_expected, v0, by=["c0", "c1", "v"]) + assert_df_equal_unordered(run2_expected, v1, by=["c0", "c1", "v"]) + + +def test_to_deltalake_streaming_partitions_and_filters( + path: str, + lock_settings: dict[str, Any], +) -> None: + df1 = pd.DataFrame({"c0": [1, 1, 2], "c1": [10, 11, 12], "v": [1, 2, 3]}) + df2 = pd.DataFrame({"c0": [2, 3, 3], "c1": [13, 14, 15], "v": [4, 5, 6]}) + + wr.s3.to_deltalake_streaming( + dfs=[df1, df2], + path=path, + mode="overwrite", + partition_cols=["c0", "c1"], + **lock_settings, + ) + + only_c02 = wr.s3.read_deltalake( + path=path, + partitions=[("c0", "=", "2")], + columns=["v", "c1"], + ) + assert set(only_c02["c1"].tolist()) == {12, 13} + assert sorted(only_c02["v"].tolist()) == [3, 4] + + +def test_to_deltalake_streaming_empty_iterator_is_noop( + path: str, + lock_settings: dict[str, Any], +) -> None: + wr.s3.to_deltalake_streaming( + dfs=[pd.DataFrame({"c0": [1], "c1": [1], "v": [1]})], + path=path, + mode="overwrite", + partition_cols=["c0", "c1"], + **lock_settings, + ) + baseline = wr.s3.read_deltalake(path=path) + + def empty() -> Iterator[pd.DataFrame]: + if False: + yield pd.DataFrame() # pragma: no cover + + wr.s3.to_deltalake_streaming( + dfs=empty(), + path=path, + mode="overwrite", + partition_cols=["c0", "c1"], + **lock_settings, + ) + after = wr.s3.read_deltalake(path=path) + assert after.equals(baseline) + + +def test_df_iter_to_record_batch_reader_schema_and_rows() -> None: + df_empty = pd.DataFrame({"a": [], "b": []}) + df1 = pd.DataFrame({"a": [1, 2], "b": ["x", "y"]}) + df2 = pd.DataFrame({"a": [3], "b": ["z"]}) + + reader, schema = _df_iter_to_record_batch_reader( + df_iter=[df_empty, df1, df2], + index=False, + dtype={}, + target_schema=None, + batch_size=None, + ) + + assert isinstance(schema, pa.Schema) + assert {f.name for f in schema} == {"a", "b"} + + table: pa.Table = reader.read_all() + pdf = table.to_pandas() + assert len(pdf) == 3 + assert sorted(pdf["a"].tolist()) == [1, 2, 3] + assert set(pdf["b"].tolist()) == {"x", "y", "z"} + + +def test_df_iter_to_record_batch_reader_respects_batch_size() -> None: + df1 = pd.DataFrame({"a": list(range(5)), "b": ["x"] * 5}) + df2 = pd.DataFrame({"a": list(range(5, 9)), "b": ["y"] * 4}) + + reader, _ = _df_iter_to_record_batch_reader( + df_iter=[df1, df2], + index=False, + dtype={}, + target_schema=None, + batch_size=3, + ) + + batch_count = 0 + row_count = 0 + for batch in reader: + batch_count += 1 + row_count += batch.num_rows + + assert batch_count >= 3 + assert row_count == 9 From c0a24548df332f0518af57530932a55ba247b8be Mon Sep 17 00:00:00 2001 From: Shawn Koschik Date: Thu, 6 Nov 2025 13:32:10 -0500 Subject: [PATCH 5/5] fix: import sort --- tests/unit/test_s3_deltalake.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_s3_deltalake.py b/tests/unit/test_s3_deltalake.py index b7c488272..201179f09 100644 --- a/tests/unit/test_s3_deltalake.py +++ b/tests/unit/test_s3_deltalake.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any, Iterator, Iterable +from typing import Any, Iterable, Iterator import boto3 import pyarrow as pa