Skip to content

Commit f1b6173

Browse files
committed
refactor: Camera args
1 parent 3fc8619 commit f1b6173

File tree

7 files changed

+50
-41
lines changed

7 files changed

+50
-41
lines changed

src/arduino/app_peripherals/camera/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,9 @@ See the arduino.app_utils.image module for more supported adjustments.
7272
## Camera Types
7373
The Camera class provides automatic camera type detection based on the format of its source argument. keyword arguments will be propagated to the underlying implementation.
7474

75-
Note: constructor arguments (except source) must be provided in keyword format to forward them correctly to the specific camera implementations.
75+
Note: Camera's constructor arguments (except those in its signature) must be provided in keyword format to forward them correctly to the specific camera implementations.
7676

77-
The underlying camera implementations can be instantiated explicitly (V4LCamera, IPCamera and WebSocketCamera), if needed.
77+
The underlying camera implementations can also be instantiated explicitly (V4LCamera, IPCamera and WebSocketCamera), if needed.
7878

7979
### V4L Cameras
8080
For local USB cameras and V4L-compatible devices.

src/arduino/app_peripherals/camera/base_camera.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import threading
66
import time
77
from abc import ABC, abstractmethod
8-
from typing import Optional, Tuple, Callable
8+
from typing import Optional, Callable
99
import numpy as np
1010

1111
from arduino.app_utils import Logger
@@ -25,9 +25,9 @@ class BaseCamera(ABC):
2525

2626
def __init__(
2727
self,
28-
resolution: Optional[Tuple[int, int]] = (640, 480),
28+
resolution: tuple[int, int] = (640, 480),
2929
fps: int = 10,
30-
adjustments: Optional[Callable[[np.ndarray], np.ndarray]] = None,
30+
adjustments: Callable[[np.ndarray], np.ndarray] = None,
3131
):
3232
"""
3333
Initialize the camera base.
@@ -87,7 +87,7 @@ def capture(self) -> Optional[np.ndarray]:
8787
return None
8888
return frame
8989

90-
def _extract_frame(self) -> Optional[np.ndarray]:
90+
def _extract_frame(self) -> np.ndarray | None:
9191
"""Extract a frame with FPS throttling and post-processing."""
9292
with self._camera_lock:
9393
# FPS throttling

src/arduino/app_peripherals/camera/camera.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22
#
33
# SPDX-License-Identifier: MPL-2.0
44

5-
from typing import Union
5+
from collections.abc import Callable
66
from urllib.parse import urlparse
77

8+
import numpy as np
9+
810
from .base_camera import BaseCamera
911
from .errors import CameraConfigError
1012

@@ -25,7 +27,14 @@ class Camera:
2527
format to forward them correctly to the specific camera implementations.
2628
"""
2729

28-
def __new__(cls, source: Union[str, int] = 0, **kwargs) -> BaseCamera:
30+
def __new__(
31+
cls,
32+
source: str | int = 0,
33+
resolution: tuple[int, int] = (640, 480),
34+
fps: int = 10,
35+
adjustments: Callable[[np.ndarray], np.ndarray] = None,
36+
**kwargs
37+
) -> BaseCamera:
2938
"""Create a camera instance based on the source type.
3039
3140
Args:
@@ -34,13 +43,12 @@ def __new__(cls, source: Union[str, int] = 0, **kwargs) -> BaseCamera:
3443
- str: V4L camera index (e.g., "0", "1") or device path (e.g., "/dev/video0")
3544
- str: URL for IP cameras (e.g., "rtsp://...", "http://...")
3645
- str: WebSocket URL for input streams (e.g., "ws://0.0.0.0:8080")
46+
resolution (tuple, optional): Frame resolution as (width, height).
47+
Default: (640, 480)
48+
fps (int, optional): Target frames per second. Default: 10
49+
adjustments (callable, optional): Function pipeline to adjust frames that takes a
50+
numpy array and returns a numpy array. Default: None
3751
**kwargs: Camera-specific configuration parameters grouped by type:
38-
Common Parameters:
39-
resolution (tuple, optional): Frame resolution as (width, height).
40-
Default: (640, 480)
41-
fps (int, optional): Target frames per second. Default: 10
42-
adjustments (callable, optional): Function pipeline to adjust frames that takes a
43-
numpy array and returns a numpy array. Default: None
4452
V4L Camera Parameters:
4553
device (int, optional): V4L device index override. Default: 0.
4654
IP Camera Parameters:
@@ -88,26 +96,26 @@ def __new__(cls, source: Union[str, int] = 0, **kwargs) -> BaseCamera:
8896
# V4L Camera
8997
from .v4l_camera import V4LCamera
9098

91-
return V4LCamera(source, **kwargs)
99+
return V4LCamera(source, resolution=resolution, fps=fps, adjustments=adjustments, **kwargs)
92100
elif isinstance(source, str):
93101
parsed = urlparse(source)
94102
if parsed.scheme in ["http", "https", "rtsp"]:
95103
# IP Camera
96104
from .ip_camera import IPCamera
97105

98-
return IPCamera(source, **kwargs)
106+
return IPCamera(source, resolution=resolution, fps=fps, adjustments=adjustments, **kwargs)
99107
elif parsed.scheme in ["ws", "wss"]:
100108
# WebSocket Camera - extract host and port from URL
101109
from .websocket_camera import WebSocketCamera
102110

103111
host = parsed.hostname or "localhost"
104112
port = parsed.port or 8080
105-
return WebSocketCamera(host=host, port=port, **kwargs)
113+
return WebSocketCamera(host=host, port=port, resolution=resolution, fps=fps, adjustments=adjustments, **kwargs)
106114
elif source.startswith("/dev/video") or source.isdigit():
107115
# V4L device path or index as string
108116
from .v4l_camera import V4LCamera
109117

110-
return V4LCamera(source, **kwargs)
118+
return V4LCamera(source, resolution=resolution, fps=fps, adjustments=adjustments, **kwargs)
111119
else:
112120
raise CameraConfigError(f"Unsupported camera source: {source}")
113121
else:

src/arduino/app_peripherals/camera/examples/1_initialize.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,6 @@
1313
camera = Camera(2, resolution=(640, 480), fps=15) # Infers camera type
1414
v4l = V4LCamera(2, (640, 480), 15) # Explicitly requests V4L camera
1515

16-
# Note: constructor arguments (except source) must be provided in keyword
17-
# format to forward them correctly to the specific camera implementations.
16+
# Note: Camera's constructor arguments (except those in its signature)
17+
# must be provided in keyword format to forward them correctly to the
18+
# specific camera implementations.

src/arduino/app_peripherals/camera/ip_camera.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
import cv2
66
import numpy as np
77
import requests
8-
from typing import Callable, Optional, Tuple
98
from urllib.parse import urlparse
9+
from collections.abc import Callable
1010

1111
from arduino.app_utils import Logger
1212

@@ -27,12 +27,12 @@ class IPCamera(BaseCamera):
2727
def __init__(
2828
self,
2929
url: str,
30-
username: Optional[str] = None,
31-
password: Optional[str] = None,
30+
username: str | None = None,
31+
password: str | None = None,
3232
timeout: int = 10,
33-
resolution: Optional[Tuple[int, int]] = (640, 480),
33+
resolution: tuple[int, int] = (640, 480),
3434
fps: int = 10,
35-
adjustments: Optional[Callable[[np.ndarray], np.ndarray]] = None,
35+
adjustments: Callable[[np.ndarray], np.ndarray] = None,
3636
):
3737
"""
3838
Initialize IP camera.
@@ -126,7 +126,7 @@ def _close_camera(self) -> None:
126126
self._cap.release()
127127
self._cap = None
128128

129-
def _read_frame(self) -> Optional[np.ndarray]:
129+
def _read_frame(self) -> np.ndarray | None:
130130
"""Read a frame from the IP camera with automatic reconnection."""
131131
if self._cap is None:
132132
logger.info(f"No connection to IP camera {self.url}, attempting to reconnect")

src/arduino/app_peripherals/camera/v4l_camera.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import re
77
import cv2
88
import numpy as np
9-
from typing import Callable, Optional, Tuple, Union, Dict
9+
from collections.abc import Callable
1010

1111
from arduino.app_utils import Logger
1212

@@ -26,10 +26,10 @@ class V4LCamera(BaseCamera):
2626

2727
def __init__(
2828
self,
29-
device: Union[str, int] = 0,
30-
resolution: Optional[Tuple[int, int]] = (640, 480),
29+
device: str | int = 0,
30+
resolution: tuple[int, int] = (640, 480),
3131
fps: int = 10,
32-
adjustments: Optional[Callable[[np.ndarray], np.ndarray]] = None,
32+
adjustments: Callable[[np.ndarray], np.ndarray] = None,
3333
):
3434
"""
3535
Initialize V4L camera.
@@ -49,7 +49,7 @@ def __init__(
4949

5050
self._cap = None
5151

52-
def _resolve_camera_id(self, device: Union[str, int]) -> int:
52+
def _resolve_camera_id(self, device: str | int) -> int:
5353
"""
5454
Resolve camera identifier to a numeric device ID.
5555
@@ -83,7 +83,7 @@ def _resolve_camera_id(self, device: Union[str, int]) -> int:
8383

8484
raise CameraOpenError(f"Cannot resolve camera identifier: {device}")
8585

86-
def _get_video_devices_by_index(self) -> Dict[int, str]:
86+
def _get_video_devices_by_index(self) -> dict[int, str]:
8787
"""
8888
Map camera indices to device numbers by reading /dev/v4l/by-id/.
8989
@@ -160,7 +160,7 @@ def _close_camera(self) -> None:
160160
self._cap.release()
161161
self._cap = None
162162

163-
def _read_frame(self) -> Optional[np.ndarray]:
163+
def _read_frame(self) -> np.ndarray | None:
164164
"""Read a frame from the V4L camera."""
165165
if self._cap is None:
166166
return None

src/arduino/app_peripherals/camera/websocket_camera.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@
77
import threading
88
import queue
99
import time
10-
from typing import Callable, Optional, Tuple, Union
1110
import numpy as np
1211
import cv2
1312
import websockets
1413
import asyncio
14+
from collections.abc import Callable
1515
from concurrent.futures import CancelledError, TimeoutError
1616

1717
from arduino.app_utils import Logger
@@ -48,9 +48,9 @@ def __init__(
4848
port: int = 8080,
4949
timeout: int = 10,
5050
frame_format: str = "binary",
51-
resolution: Optional[Tuple[int, int]] = (640, 480),
51+
resolution: tuple[int, int] = (640, 480),
5252
fps: int = 10,
53-
adjustments: Optional[Callable[[np.ndarray], np.ndarray]] = None,
53+
adjustments: Callable[[np.ndarray], np.ndarray] = None,
5454
):
5555
"""
5656
Initialize WebSocket camera server.
@@ -78,7 +78,7 @@ def __init__(
7878
self._loop = None
7979
self._server_thread = None
8080
self._stop_event = asyncio.Event()
81-
self._client: Optional[websockets.ServerConnection] = None
81+
self._client: websockets.ServerConnection = None
8282
self._client_lock = asyncio.Lock()
8383

8484
def _open_camera(self) -> None:
@@ -195,7 +195,7 @@ async def _ws_handler(self, conn: websockets.ServerConnection) -> None:
195195
self._client = None
196196
logger.info(f"Client removed: {client_addr}")
197197

198-
async def _parse_message(self, message) -> Optional[np.ndarray]:
198+
async def _parse_message(self, message) -> np.ndarray | None:
199199
"""Parse WebSocket message to extract frame."""
200200
try:
201201
if self.frame_format == "base64":
@@ -297,7 +297,7 @@ async def _set_async_stop_event(self):
297297
await self._client.close()
298298
self._stop_event.set()
299299

300-
def _read_frame(self) -> Optional[np.ndarray]:
300+
def _read_frame(self) -> np.ndarray | None:
301301
"""Read a frame from the queue."""
302302
try:
303303
# Get frame with short timeout to avoid blocking
@@ -306,7 +306,7 @@ def _read_frame(self) -> Optional[np.ndarray]:
306306
except queue.Empty:
307307
return None
308308

309-
def _send_message_to_client(self, message: Union[str, bytes, dict]) -> None:
309+
def _send_message_to_client(self, message: str | bytes | dict) -> None:
310310
"""
311311
Send a message to the connected client (if any).
312312
@@ -333,7 +333,7 @@ def _send_message_to_client(self, message: Union[str, bytes, dict]) -> None:
333333
logger.error(f"Error sending message to client: {e}")
334334
raise
335335

336-
async def _send_to_client(self, message: Union[str, bytes, dict]) -> None:
336+
async def _send_to_client(self, message: str | bytes | dict) -> None:
337337
"""Send message to a single client."""
338338
if isinstance(message, dict):
339339
message = json.dumps(message)

0 commit comments

Comments
 (0)