diff --git a/awswrangler/s3/__init__.py b/awswrangler/s3/__init__.py index 3c53e0034..30b8320f2 100644 --- a/awswrangler/s3/__init__.py +++ b/awswrangler/s3/__init__.py @@ -13,7 +13,7 @@ from awswrangler.s3._select import select_query from awswrangler.s3._upload import upload from awswrangler.s3._wait import wait_objects_exist, wait_objects_not_exist -from awswrangler.s3._write_deltalake import to_deltalake +from awswrangler.s3._write_deltalake import to_deltalake, to_deltalake_streaming from awswrangler.s3._write_excel import to_excel from awswrangler.s3._write_orc import to_orc from awswrangler.s3._write_parquet import store_parquet_metadata, to_parquet @@ -49,6 +49,7 @@ "to_csv", "to_json", "to_deltalake", + "to_deltalake_streaming", "to_excel", "read_excel", "download", diff --git a/awswrangler/s3/_write_deltalake.py b/awswrangler/s3/_write_deltalake.py index bc5e23e26..a60318fa8 100644 --- a/awswrangler/s3/_write_deltalake.py +++ b/awswrangler/s3/_write_deltalake.py @@ -2,7 +2,7 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, Literal +from typing import TYPE_CHECKING, Any, Iterable, Iterator, Literal import boto3 import pandas as pd @@ -30,6 +30,7 @@ def _set_default_storage_options_kwargs( defaults = {key.upper(): value for key, value in _utils.boto3_to_primitives(boto3_session=boto3_session).items()} defaults["AWS_REGION"] = defaults.pop("REGION_NAME") defaults["AWS_SESSION_TOKEN"] = "" if defaults["AWS_SESSION_TOKEN"] is None else defaults["AWS_SESSION_TOKEN"] + s3_additional_kwargs = s3_additional_kwargs or {} s3_lock_arguments = {} @@ -133,3 +134,93 @@ def to_deltalake( schema_mode=schema_mode, storage_options=storage_options, ) + + +def _df_iter_to_record_batch_reader( + df_iter: Iterable[pd.DataFrame], + *, + index: bool, + dtype: dict[str, str], + target_schema: pa.Schema | None = None, + batch_size: int | None = None, +) -> tuple[pa.RecordBatchReader, pa.Schema]: + it = iter(df_iter) + + first_df: pd.DataFrame | None = None + for df in it: + if not df.empty: + first_df = df + break + + if first_df is None: + empty_schema = pa.schema([]) + empty_reader = pa.RecordBatchReader.from_batches(empty_schema, []) + return empty_reader, empty_schema + + schema = target_schema or _data_types.pyarrow_schema_from_pandas( + df=first_df, index=index, ignore_cols=None, dtype=dtype + ) + + def batches() -> Iterator[pa.RecordBatch]: + first_tbl: pa.Table = _df_to_table(first_df, schema, index, dtype) + for b in first_tbl.to_batches(batch_size) if batch_size is not None else first_tbl.to_batches(): + yield b + + for df in it: + if df.empty: + continue + tbl: pa.Table = _df_to_table(df, schema, index, dtype) + for b in tbl.to_batches(batch_size) if batch_size is not None else tbl.to_batches(): + yield b + + reader = pa.RecordBatchReader.from_batches(schema, batches()) + return reader, schema + + +@_utils.check_optional_dependency(deltalake, "deltalake") +@Experimental +def to_deltalake_streaming( + *, + dfs: Iterable[pd.DataFrame], + path: str, + index: bool = False, + mode: Literal["error", "append", "overwrite", "ignore"] = "append", + dtype: dict[str, str] | None = None, + partition_cols: list[str] | None = None, + schema_mode: Literal["overwrite", "merge"] | None = None, + lock_dynamodb_table: str | None = None, + s3_allow_unsafe_rename: bool = False, + boto3_session: boto3.Session | None = None, + s3_additional_kwargs: dict[str, str] | None = None, + batch_size: int | None = None, + target_file_size: int | None = None, +) -> None: + dtype = dtype or {} + + storage_options = _set_default_storage_options_kwargs( + boto3_session=boto3_session, + s3_additional_kwargs=s3_additional_kwargs, + s3_allow_unsafe_rename=s3_allow_unsafe_rename, + lock_dynamodb_table=lock_dynamodb_table, + ) + + reader, schema = _df_iter_to_record_batch_reader( + df_iter=dfs, + index=index, + dtype=dtype, + target_schema=None, + batch_size=batch_size, + ) + + if len(schema) == 0: + return + + deltalake.write_deltalake( + table_or_uri=path, + data=reader, + partition_by=partition_cols, + mode=mode, + schema_mode=schema_mode, + storage_options=storage_options, + target_file_size=target_file_size, + ) diff --git a/tests/unit/test_s3_deltalake.py b/tests/unit/test_s3_deltalake.py index 780738892..201179f09 100644 --- a/tests/unit/test_s3_deltalake.py +++ b/tests/unit/test_s3_deltalake.py @@ -1,24 +1,36 @@ from __future__ import annotations -from typing import Any, Iterator +from typing import Any, Iterable, Iterator import boto3 +import pyarrow as pa import pytest +from pandas.testing import assert_frame_equal import awswrangler as wr import awswrangler.pandas as pd +from awswrangler.s3._write_deltalake import _df_iter_to_record_batch_reader from .._utils import ( get_time_str_with_random_suffix, ) +def assert_df_equal_unordered(left: pd.DataFrame, right: pd.DataFrame, by: list[str]) -> None: + """Compare two dataframes ignoring row order and dtypes.""" + l2 = left.sort_values(by).reset_index(drop=True) + r2 = right.sort_values(by).reset_index(drop=True) + + assert_frame_equal(l2, r2, check_dtype=False, check_like=True) + + @pytest.fixture(scope="session") def lock_dynamodb_table() -> Iterator[str]: name = f"deltalake_lock_{get_time_str_with_random_suffix()}" print(f"Table name: {name}") dynamodb_client = boto3.client("dynamodb") + dynamodb_client.create_table( TableName=name, BillingMode="PAY_PER_REQUEST", @@ -94,3 +106,165 @@ def test_read_deltalake_partitions(path: str, lock_settings: dict[str, Any]) -> df2 = wr.s3.read_deltalake(path=path, columns=["c0"], partitions=[("par0", "=", "foo"), ("par1", "=", "1")]) assert df2.shape == (1, 1) + + +@pytest.mark.parametrize("chunksize", [2, 10]) +def test_to_deltalake_streaming_single_commit_overwrite( + path: str, + lock_settings: dict[str, Any], + chunksize: int, +) -> None: + df1 = pd.DataFrame({"c0": [1, 1], "c1": [10, 11], "v": [100, 200]}) + df2 = pd.DataFrame({"c0": [2, 2], "c1": [12, 13], "v": [300, 400]}) + + def dfs() -> Iterable[pd.DataFrame]: + yield df1 + yield df2 + + wr.s3.to_deltalake_streaming( + dfs=dfs(), + path=path, + mode="overwrite", + partition_cols=["c0", "c1"], + **lock_settings, + ) + + out = wr.s3.read_deltalake(path=path) + + expected = pd.concat([df1, df2], ignore_index=True) + assert_df_equal_unordered(expected, out, by=["c0", "c1", "v"]) + + +def test_to_deltalake_streaming_creates_one_version_per_run( + path: str, + lock_settings: dict[str, Any], +) -> None: + df_run1_a = pd.DataFrame({"c0": [1], "c1": [10], "v": [111]}) + df_run1_b = pd.DataFrame({"c0": [1], "c1": [11], "v": [112]}) + + wr.s3.to_deltalake_streaming( + dfs=[df_run1_a, df_run1_b], + path=path, + mode="overwrite", + partition_cols=["c0", "c1"], + **lock_settings, + ) + + run1_expected = pd.concat([df_run1_a, df_run1_b], ignore_index=True) + latest_v0 = wr.s3.read_deltalake(path=path) + assert_df_equal_unordered(run1_expected, latest_v0, by=["c0", "c1", "v"]) + + df_run2_a = pd.DataFrame({"c0": [2], "c1": [12], "v": [221]}) + df_run2_b = pd.DataFrame({"c0": [2], "c1": [13], "v": [222]}) + + wr.s3.to_deltalake_streaming( + dfs=[df_run2_a, df_run2_b], + path=path, + mode="overwrite", + partition_cols=["c0", "c1"], + **lock_settings, + ) + + v0 = wr.s3.read_deltalake(path=path, version=0) + v1 = wr.s3.read_deltalake(path=path, version=1) + run2_expected = pd.concat([df_run2_a, df_run2_b], ignore_index=True) + + assert_df_equal_unordered(run1_expected, v0, by=["c0", "c1", "v"]) + assert_df_equal_unordered(run2_expected, v1, by=["c0", "c1", "v"]) + + +def test_to_deltalake_streaming_partitions_and_filters( + path: str, + lock_settings: dict[str, Any], +) -> None: + df1 = pd.DataFrame({"c0": [1, 1, 2], "c1": [10, 11, 12], "v": [1, 2, 3]}) + df2 = pd.DataFrame({"c0": [2, 3, 3], "c1": [13, 14, 15], "v": [4, 5, 6]}) + + wr.s3.to_deltalake_streaming( + dfs=[df1, df2], + path=path, + mode="overwrite", + partition_cols=["c0", "c1"], + **lock_settings, + ) + + only_c02 = wr.s3.read_deltalake( + path=path, + partitions=[("c0", "=", "2")], + columns=["v", "c1"], + ) + assert set(only_c02["c1"].tolist()) == {12, 13} + assert sorted(only_c02["v"].tolist()) == [3, 4] + + +def test_to_deltalake_streaming_empty_iterator_is_noop( + path: str, + lock_settings: dict[str, Any], +) -> None: + wr.s3.to_deltalake_streaming( + dfs=[pd.DataFrame({"c0": [1], "c1": [1], "v": [1]})], + path=path, + mode="overwrite", + partition_cols=["c0", "c1"], + **lock_settings, + ) + baseline = wr.s3.read_deltalake(path=path) + + def empty() -> Iterator[pd.DataFrame]: + if False: + yield pd.DataFrame() # pragma: no cover + + wr.s3.to_deltalake_streaming( + dfs=empty(), + path=path, + mode="overwrite", + partition_cols=["c0", "c1"], + **lock_settings, + ) + after = wr.s3.read_deltalake(path=path) + assert after.equals(baseline) + + +def test_df_iter_to_record_batch_reader_schema_and_rows() -> None: + df_empty = pd.DataFrame({"a": [], "b": []}) + df1 = pd.DataFrame({"a": [1, 2], "b": ["x", "y"]}) + df2 = pd.DataFrame({"a": [3], "b": ["z"]}) + + reader, schema = _df_iter_to_record_batch_reader( + df_iter=[df_empty, df1, df2], + index=False, + dtype={}, + target_schema=None, + batch_size=None, + ) + + assert isinstance(schema, pa.Schema) + assert {f.name for f in schema} == {"a", "b"} + + table: pa.Table = reader.read_all() + pdf = table.to_pandas() + assert len(pdf) == 3 + assert sorted(pdf["a"].tolist()) == [1, 2, 3] + assert set(pdf["b"].tolist()) == {"x", "y", "z"} + + +def test_df_iter_to_record_batch_reader_respects_batch_size() -> None: + df1 = pd.DataFrame({"a": list(range(5)), "b": ["x"] * 5}) + df2 = pd.DataFrame({"a": list(range(5, 9)), "b": ["y"] * 4}) + + reader, _ = _df_iter_to_record_batch_reader( + df_iter=[df1, df2], + index=False, + dtype={}, + target_schema=None, + batch_size=3, + ) + + batch_count = 0 + row_count = 0 + for batch in reader: + batch_count += 1 + row_count += batch.num_rows + + assert batch_count >= 3 + assert row_count == 9