diff --git a/.github/workflows/pr_modular_tests.yml b/.github/workflows/pr_modular_tests.yml index 7081ee518d55..afaca1b90d3b 100644 --- a/.github/workflows/pr_modular_tests.yml +++ b/.github/workflows/pr_modular_tests.yml @@ -77,62 +77,46 @@ jobs: run_fast_tests: needs: [check_code_quality, check_repository_consistency] - strategy: - fail-fast: false - matrix: - config: - - name: Fast PyTorch Modular Pipeline CPU tests - framework: pytorch_pipelines - runner: aws-highmemory-32-plus - image: diffusers/diffusers-pytorch-cpu - report: torch_cpu_modular_pipelines - - name: ${{ matrix.config.name }} - + name: Fast PyTorch Modular Pipeline CPU tests runs-on: - group: ${{ matrix.config.runner }} - + group: aws-highmemory-32-plus container: - image: ${{ matrix.config.image }} + image: diffusers/diffusers-pytorch-cpu options: --shm-size "16gb" --ipc host -v /mnt/hf_cache:/mnt/cache/ - defaults: run: shell: bash steps: - - name: Checkout diffusers - uses: actions/checkout@v3 - with: - fetch-depth: 2 - - - name: Install dependencies - run: | - uv pip install -e ".[quality]" - uv pip uninstall transformers huggingface_hub && uv pip install --prerelease allow -U transformers@git+https://github.com/huggingface/transformers.git - uv pip uninstall accelerate && uv pip install -U accelerate@git+https://github.com/huggingface/accelerate.git --no-deps - - - name: Environment - run: | - python utils/print_env.py + - name: Checkout diffusers + uses: actions/checkout@v3 + with: + fetch-depth: 2 - - name: Run fast PyTorch Pipeline CPU tests - if: ${{ matrix.config.framework == 'pytorch_pipelines' }} - run: | - pytest -n 8 --max-worker-restart=0 --dist=loadfile \ - -s -v -k "not Flax and not Onnx" \ - --make-reports=tests_${{ matrix.config.report }} \ - tests/modular_pipelines + - name: Install dependencies + run: | + uv pip install -e ".[quality]" + uv pip uninstall transformers huggingface_hub && uv pip install --prerelease allow -U transformers@git+https://github.com/huggingface/transformers.git + uv pip uninstall accelerate && uv pip install -U accelerate@git+https://github.com/huggingface/accelerate.git --no-deps - - name: Failure short reports - if: ${{ failure() }} - run: cat reports/tests_${{ matrix.config.report }}_failures_short.txt + - name: Environment + run: | + python utils/print_env.py - - name: Test suite reports artifacts - if: ${{ always() }} - uses: actions/upload-artifact@v4 - with: - name: pr_${{ matrix.config.framework }}_${{ matrix.config.report }}_test_reports - path: reports + - name: Run fast PyTorch Pipeline CPU tests + run: | + pytest -n 8 --max-worker-restart=0 --dist=loadfile \ + -s -v \ + --make-reports=tests_torch_cpu_modular_pipelines \ + tests/modular_pipelines + - name: Failure short reports + if: ${{ failure() }} + run: cat reports/tests_torch_cpu_modular_pipelines_failures_short.txt + - name: Test suite reports artifacts + if: ${{ always() }} + uses: actions/upload-artifact@v4 + with: + name: pr_pytorch_pipelines_torch_cpu_modular_pipelines_test_reports + path: reports diff --git a/src/diffusers/utils/dynamic_modules_utils.py b/src/diffusers/utils/dynamic_modules_utils.py index b2ef5a29e03e..178657f00ae9 100644 --- a/src/diffusers/utils/dynamic_modules_utils.py +++ b/src/diffusers/utils/dynamic_modules_utils.py @@ -301,7 +301,6 @@ def get_cached_module_file( pretrained_model_name_or_path = str(pretrained_model_name_or_path) module_file_or_url = os.path.join(pretrained_model_name_or_path, module_file) - if os.path.isfile(module_file_or_url): resolved_module_file = module_file_or_url submodule = "local" diff --git a/tests/conftest.py b/tests/conftest.py index fd76d1c84ee7..9558c23d3062 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -32,6 +32,8 @@ def pytest_configure(config): config.addinivalue_line("markers", "big_accelerator: marks tests as requiring big accelerator resources") + config.addinivalue_line("markers", "slow: mark test as slow") + config.addinivalue_line("markers", "nightly: mark test as nightly") def pytest_addoption(parser): diff --git a/tests/modular_pipelines/test_modular_pipelines_custom_blocks.py b/tests/modular_pipelines/test_modular_pipelines_custom_blocks.py new file mode 100644 index 000000000000..9c5fd5be326d --- /dev/null +++ b/tests/modular_pipelines/test_modular_pipelines_custom_blocks.py @@ -0,0 +1,272 @@ +# Copyright 2025 The HuggingFace Team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +import tempfile +from collections import deque +from typing import List + +import numpy as np +import torch + +from diffusers import FluxTransformer2DModel +from diffusers.modular_pipelines import ( + ComponentSpec, + InputParam, + ModularPipelineBlocks, + OutputParam, + PipelineState, + WanModularPipeline, +) + +from ..testing_utils import nightly, require_torch, slow + + +class DummyCustomBlockSimple(ModularPipelineBlocks): + def __init__(self, use_dummy_model_component=False): + self.use_dummy_model_component = use_dummy_model_component + super().__init__() + + @property + def expected_components(self): + if self.use_dummy_model_component: + return [ComponentSpec("transformer", FluxTransformer2DModel)] + else: + return [] + + @property + def inputs(self) -> List[InputParam]: + return [InputParam("prompt", type_hint=str, required=True, description="Prompt to use")] + + @property + def intermediate_inputs(self) -> List[InputParam]: + return [] + + @property + def intermediate_outputs(self) -> List[OutputParam]: + return [ + OutputParam( + "output_prompt", + type_hint=str, + description="Modified prompt", + ) + ] + + def __call__(self, components, state: PipelineState) -> PipelineState: + block_state = self.get_block_state(state) + + old_prompt = block_state.prompt + block_state.output_prompt = "Modular diffusers + " + old_prompt + self.set_block_state(state, block_state) + + return components, state + + +CODE_STR = """ +from diffusers.modular_pipelines import ( + ComponentSpec, + InputParam, + ModularPipelineBlocks, + OutputParam, + PipelineState, + WanModularPipeline, +) +from typing import List + +class DummyCustomBlockSimple(ModularPipelineBlocks): + def __init__(self, use_dummy_model_component=False): + self.use_dummy_model_component = use_dummy_model_component + super().__init__() + + @property + def expected_components(self): + if self.use_dummy_model_component: + return [ComponentSpec("transformer", FluxTransformer2DModel)] + else: + return [] + + @property + def inputs(self) -> List[InputParam]: + return [InputParam("prompt", type_hint=str, required=True, description="Prompt to use")] + + @property + def intermediate_inputs(self) -> List[InputParam]: + return [] + + @property + def intermediate_outputs(self) -> List[OutputParam]: + return [ + OutputParam( + "output_prompt", + type_hint=str, + description="Modified prompt", + ) + ] + + def __call__(self, components, state: PipelineState) -> PipelineState: + block_state = self.get_block_state(state) + + old_prompt = block_state.prompt + block_state.output_prompt = "Modular diffusers + " + old_prompt + self.set_block_state(state, block_state) + + return components, state +""" + + +class TestModularCustomBlocks: + def _test_block_properties(self, block): + assert not block.expected_components + assert not block.intermediate_inputs + + actual_inputs = [inp.name for inp in block.inputs] + actual_intermediate_outputs = [out.name for out in block.intermediate_outputs] + assert actual_inputs == ["prompt"] + assert actual_intermediate_outputs == ["output_prompt"] + + def test_custom_block_properties(self): + custom_block = DummyCustomBlockSimple() + self._test_block_properties(custom_block) + + def test_custom_block_output(self): + custom_block = DummyCustomBlockSimple() + pipe = custom_block.init_pipeline() + prompt = "Diffusers is nice" + output = pipe(prompt=prompt) + + actual_inputs = [inp.name for inp in custom_block.inputs] + actual_intermediate_outputs = [out.name for out in custom_block.intermediate_outputs] + assert sorted(output.values) == sorted(actual_inputs + actual_intermediate_outputs) + + output_prompt = output.values["output_prompt"] + assert output_prompt.startswith("Modular diffusers + ") + + def test_custom_block_saving_loading(self): + custom_block = DummyCustomBlockSimple() + + with tempfile.TemporaryDirectory() as tmpdir: + custom_block.save_pretrained(tmpdir) + assert any("modular_config.json" in k for k in os.listdir(tmpdir)) + + with open(os.path.join(tmpdir, "modular_config.json"), "r") as f: + config = json.load(f) + auto_map = config["auto_map"] + assert auto_map == {"ModularPipelineBlocks": "test_modular_pipelines_custom_blocks.DummyCustomBlockSimple"} + + # For now, the Python script that implements the custom block has to be manually pushed to the Hub. + # This is why, we have to separately save the Python script here. + code_path = os.path.join(tmpdir, "test_modular_pipelines_custom_blocks.py") + with open(code_path, "w") as f: + f.write(CODE_STR) + + loaded_custom_block = ModularPipelineBlocks.from_pretrained(tmpdir, trust_remote_code=True) + + pipe = loaded_custom_block.init_pipeline() + prompt = "Diffusers is nice" + output = pipe(prompt=prompt) + + actual_inputs = [inp.name for inp in loaded_custom_block.inputs] + actual_intermediate_outputs = [out.name for out in loaded_custom_block.intermediate_outputs] + assert sorted(output.values) == sorted(actual_inputs + actual_intermediate_outputs) + + output_prompt = output.values["output_prompt"] + assert output_prompt.startswith("Modular diffusers + ") + + def test_custom_block_supported_components(self): + custom_block = DummyCustomBlockSimple(use_dummy_model_component=True) + pipe = custom_block.init_pipeline("hf-internal-testing/tiny-flux-kontext-pipe") + pipe.load_components() + + assert len(pipe.components) == 1 + assert pipe.component_names[0] == "transformer" + + def test_custom_block_loads_from_hub(self): + repo_id = "hf-internal-testing/tiny-modular-diffusers-block" + block = ModularPipelineBlocks.from_pretrained(repo_id, trust_remote_code=True) + self._test_block_properties(block) + + pipe = block.init_pipeline() + + prompt = "Diffusers is nice" + output = pipe(prompt=prompt) + output_prompt = output.values["output_prompt"] + assert output_prompt.startswith("Modular diffusers + ") + + +@slow +@nightly +@require_torch +class TestKreaCustomBlocksIntegration: + repo_id = "krea/krea-realtime-video" + + def test_loading_from_hub(self): + blocks = ModularPipelineBlocks.from_pretrained(self.repo_id, trust_remote_code=True) + block_names = sorted(blocks.sub_blocks) + + assert block_names == sorted(["text_encoder", "before_denoise", "denoise", "decode"]) + + pipe = WanModularPipeline(blocks, self.repo_id) + pipe.load_components( + trust_remote_code=True, + device_map="cuda", + torch_dtype={"default": torch.bfloat16, "vae": torch.float16}, + ) + assert len(pipe.components) == 7 + assert sorted(pipe.components) == sorted( + ["text_encoder", "tokenizer", "guider", "scheduler", "vae", "transformer", "video_processor"] + ) + + def test_forward(self): + blocks = ModularPipelineBlocks.from_pretrained(self.repo_id, trust_remote_code=True) + pipe = WanModularPipeline(blocks, self.repo_id) + pipe.load_components( + trust_remote_code=True, + device_map="cuda", + torch_dtype={"default": torch.bfloat16, "vae": torch.float16}, + ) + + num_frames_per_block = 2 + num_blocks = 2 + + state = PipelineState() + state.set("frame_cache_context", deque(maxlen=pipe.config.frame_cache_len)) + + prompt = ["a cat sitting on a boat"] + + for block in pipe.transformer.blocks: + block.self_attn.fuse_projections() + + for block_idx in range(num_blocks): + state = pipe( + state, + prompt=prompt, + num_inference_steps=2, + num_blocks=num_blocks, + num_frames_per_block=num_frames_per_block, + block_idx=block_idx, + generator=torch.manual_seed(42), + ) + current_frames = np.array(state.values["videos"][0]) + current_frames_flat = current_frames.flatten() + actual_slices = np.concatenate([current_frames_flat[:4], current_frames_flat[-4:]]).tolist() + + if block_idx == 0: + assert current_frames.shape == (5, 480, 832, 3) + expected_slices = np.array([211, 229, 238, 208, 195, 180, 188, 193]) + else: + assert current_frames.shape == (8, 480, 832, 3) + expected_slices = np.array([179, 203, 214, 176, 194, 181, 187, 191]) + + assert np.allclose(actual_slices, expected_slices) diff --git a/tests/testing_utils.py b/tests/testing_utils.py index 951ba4128033..8f5df4887d88 100644 --- a/tests/testing_utils.py +++ b/tests/testing_utils.py @@ -24,6 +24,7 @@ import numpy as np import PIL.Image import PIL.ImageOps +import pytest import requests from numpy.linalg import norm from packaging import version @@ -267,7 +268,7 @@ def slow(test_case): Slow tests are skipped by default. Set the RUN_SLOW environment variable to a truthy value to run them. """ - return unittest.skipUnless(_run_slow_tests, "test is slow")(test_case) + return pytest.mark.skipif(not _run_slow_tests, reason="test is slow")(pytest.mark.slow(test_case)) def nightly(test_case): @@ -277,7 +278,7 @@ def nightly(test_case): Slow tests are skipped by default. Set the RUN_NIGHTLY environment variable to a truthy value to run them. """ - return unittest.skipUnless(_run_nightly_tests, "test is nightly")(test_case) + return pytest.mark.skipif(not _run_nightly_tests, reason="test is nightly")(pytest.mark.nightly(test_case)) def is_torch_compile(test_case):