22#
33# SPDX-License-Identifier: MPL-2.0
44
5- from arduino .app_utils import brick , Logger
6- from arduino .app_internal .core import load_brick_compose_file , resolve_address
7- from arduino .app_internal .core import EdgeImpulseRunnerFacade
8- import threading
95import time
6+ import json
7+ import inspect
8+ import threading
9+ import socket
1010from typing import Callable
11+
1112from websockets .sync .client import connect , ClientConnection
1213from websockets .exceptions import ConnectionClosedOK , ConnectionClosedError
13- import json
14- import inspect
14+
15+ from arduino .app_peripherals .camera import Camera
16+ from arduino .app_internal .core import load_brick_compose_file , resolve_address
17+ from arduino .app_internal .core import EdgeImpulseRunnerFacade
18+ from arduino .app_utils .image import compress_to_jpeg
19+ from arduino .app_utils import brick , Logger
1520
1621logger = Logger ("VideoImageClassification" )
1722
@@ -25,17 +30,20 @@ class VideoImageClassification:
2530
2631 ALL_HANDLERS_KEY = "__ALL"
2732
28- def __init__ (self , confidence : float = 0.3 , debounce_sec : float = 0.0 ):
33+ def __init__ (self , camera : Camera = None , confidence : float = 0.3 , debounce_sec : float = 0.0 ):
2934 """Initialize the VideoImageClassification class.
3035
3136 Args:
37+ camera (Camera): The camera instance to use for capturing video. If None, a default camera will be initialized.
3238 confidence (float): The minimum confidence level for a classification to be considered valid. Default is 0.3.
3339 debounce_sec (float): The minimum time in seconds between consecutive detections of the same object
3440 to avoid multiple triggers. Default is 0 seconds.
3541
3642 Raises:
3743 RuntimeError: If the host address could not be resolved.
3844 """
45+ self ._camera = camera if camera else Camera ()
46+
3947 self ._confidence = confidence
4048 self ._debounce_sec = debounce_sec
4149 self ._last_detected = {}
@@ -114,40 +122,26 @@ def on_detect(self, object: str, callback: Callable[[], None]):
114122 self ._handlers [object ] = callback
115123
116124 def start (self ):
117- """Start the classification stream.
118-
119- This only sets the internal running flag. You must call
120- `execute` in a loop or a separate thread to actually begin receiving classification results.
121- """
125+ """Start the classification."""
126+ self ._camera .start ()
122127 self ._is_running .set ()
123128
124129 def stop (self ):
125- """Stop the classification stream and release resources.
126-
127- This clears the running flag. Any active `execute` loop
128- will exit gracefully at its next iteration.
129- """
130+ """Stop the classification and release resources."""
130131 self ._is_running .clear ()
132+ self ._camera .stop ()
131133
132- def execute (self ):
133- """Run the main classification loop.
134-
135- Behavior:
136- - Opens a WebSocket connection to the model runner.
137- - Receives classification messages in real time.
138- - Filters classifications below the confidence threshold.
139- - Applies debounce rules before invoking callbacks.
140- - Retries on transient connection errors until stopped.
141-
142- Exceptions:
143- ConnectionClosedOK:
144- Raised to exit when the server closes the connection cleanly.
145- ConnectionClosedError, TimeoutError, ConnectionRefusedError:
146- Logged and retried with backoff.
134+ @brick .execute
135+ def classification_loop (self ):
136+ """Classification main loop.
137+
138+ Maintains WebSocket connection to the model runner and processes classification messages.
139+ Retries on connection errors until stopped.
147140 """
148141 while self ._is_running .is_set ():
149142 try :
150143 with connect (self ._uri ) as ws :
144+ logger .info ("WebSocket connection established" )
151145 while self ._is_running .is_set ():
152146 try :
153147 message = ws .recv ()
@@ -157,21 +151,56 @@ def execute(self):
157151 except ConnectionClosedOK :
158152 raise
159153 except (TimeoutError , ConnectionRefusedError , ConnectionClosedError ):
160- logger .warning (f"Connection lost. Retrying..." )
154+ logger .warning (f"WebSocket connection lost. Retrying..." )
161155 raise
162156 except Exception as e :
163157 logger .exception (f"Failed to process detection: { e } " )
164158 except ConnectionClosedOK :
165- logger .debug (f"Disconnected cleanly, exiting WebSocket read loop." )
159+ logger .debug (f"WebSocket disconnected cleanly, exiting loop." )
166160 return
167161 except (TimeoutError , ConnectionRefusedError , ConnectionClosedError ):
168162 logger .debug (f"Waiting for model runner. Retrying..." )
169- import time
170-
171163 time .sleep (2 )
172164 continue
173165 except Exception as e :
174166 logger .exception (f"Failed to establish WebSocket connection to { self ._host } : { e } " )
167+ time .sleep (2 )
168+
169+ @brick .execute
170+ def camera_loop (self ):
171+ """Camera main loop.
172+
173+ Captures images from the camera and forwards them over the TCP connection.
174+ Retries on connection errors until stopped.
175+ """
176+ while self ._is_running .is_set ():
177+ try :
178+ with socket .socket (socket .AF_INET , socket .SOCK_STREAM ) as tcp_socket :
179+ tcp_socket .connect ((self ._host , "5050" ))
180+ logger .info (f"TCP connection established to { self ._host } :5050" )
181+
182+ while self ._is_running .is_set ():
183+ try :
184+ frame = self ._camera .capture ()
185+ if frame is None :
186+ time .sleep (0.01 ) # Brief sleep if no image available
187+ continue
188+
189+ jpeg_frame = compress_to_jpeg (frame )
190+ tcp_socket .sendall (jpeg_frame .tobytes ())
191+
192+ except (BrokenPipeError , ConnectionResetError , OSError ) as e :
193+ logger .warning (f"TCP connection lost: { e } . Retrying..." )
194+ break
195+ except Exception as e :
196+ logger .exception (f"Error capturing/sending image: { e } " )
197+
198+ except (ConnectionRefusedError , OSError ) as e :
199+ logger .debug (f"TCP connection failed: { e } . Retrying in 2 seconds..." )
200+ time .sleep (2 )
201+ except Exception as e :
202+ logger .exception (f"Unexpected error in TCP loop: { e } " )
203+ time .sleep (2 )
175204
176205 def _process_message (self , ws : ClientConnection , message : str ):
177206 jmsg = json .loads (message )
0 commit comments