@@ -521,6 +521,7 @@ def process_monitor(self, sproc: "subprocess.Popen[str]", kill_switch: threading
521521 memory_usage : MutableSequence [Optional [int ]] = [None ]
522522
523523 mem_tm : "Optional[Timer]" = None
524+ ks_tm : "Optional[Timer]" = None
524525
525526 def get_tree_mem_usage (memory_usage : MutableSequence [Optional [int ]]) -> None :
526527 nonlocal mem_tm
@@ -542,10 +543,27 @@ def get_tree_mem_usage(memory_usage: MutableSequence[Optional[int]]) -> None:
542543 if mem_tm is not None :
543544 mem_tm .cancel ()
544545
546+ def monitor_kill_switch () -> None :
547+ nonlocal ks_tm
548+ if kill_switch .is_set ():
549+ _logger .error ("[job %s] terminating by kill switch" , self .name )
550+ if sproc .stdin : sproc .stdin .close ()
551+ sproc .terminate ()
552+ else :
553+ ks_tm = Timer (interval = 1 , function = monitor_kill_switch )
554+ ks_tm .daemon = True
555+ ks_tm .start ()
556+
557+ ks_tm = Timer (interval = 1 , function = monitor_kill_switch )
558+ ks_tm .daemon = True
559+ ks_tm .start ()
560+
545561 mem_tm = Timer (interval = 1 , function = get_tree_mem_usage , args = (memory_usage ,))
546562 mem_tm .daemon = True
547563 mem_tm .start ()
564+
548565 sproc .wait ()
566+ ks_tm .cancel ()
549567 mem_tm .cancel ()
550568 if memory_usage [0 ] is not None :
551569 _logger .info (
@@ -859,20 +877,48 @@ def docker_monitor(
859877 process : "subprocess.Popen[str]" ,
860878 kill_switch : threading .Event ,
861879 ) -> None :
862- """Record memory usage of the running Docker container."""
880+ """Record memory usage of the running Docker container. Terminate if kill_switch is activated."""
881+
882+ ks_tm : "Optional[Timer]" = None
883+ cid : Optional [str ] = None
884+
885+ def monitor_kill_switch () -> None :
886+ nonlocal ks_tm
887+ if kill_switch .is_set ():
888+ _logger .error ("[job %s] terminating by kill switch" , self .name )
889+ if process .stdin :
890+ process .stdin .close ()
891+ if cid is not None :
892+ kill_proc = subprocess .Popen ( # nosec
893+ [docker_exe , "kill" , cid ], shell = False # nosec
894+ )
895+ try :
896+ kill_proc .wait (timeout = 10 )
897+ except subprocess .TimeoutExpired :
898+ kill_proc .kill ()
899+ process .terminate () # Always terminate, even if we tried with the cidfile
900+ else :
901+ ks_tm = Timer (interval = 1 , function = monitor_kill_switch )
902+ ks_tm .daemon = True
903+ ks_tm .start ()
904+
905+ ks_tm = Timer (interval = 1 , function = monitor_kill_switch )
906+ ks_tm .daemon = True
907+ ks_tm .start ()
908+
863909 # Todo: consider switching to `docker create` / `docker start`
864910 # instead of `docker run` as `docker create` outputs the container ID
865911 # to stdout, but the container is frozen, thus allowing us to start the
866912 # monitoring process without dealing with the cidfile or too-fast
867913 # container execution
868- cid : Optional [str ] = None
869914 while cid is None :
870915 time .sleep (1 )
871916 # This is needed to avoid a race condition where the job
872917 # was so fast that it already finished when it arrives here
873918 if process .returncode is None :
874919 process .poll ()
875920 if process .returncode is not None :
921+ ks_tm .cancel ()
876922 if cleanup_cidfile :
877923 try :
878924 os .remove (cidfile )
@@ -904,6 +950,9 @@ def docker_monitor(
904950 except OSError as exc :
905951 _logger .warning ("Ignored error with %s stats: %s" , docker_exe , exc )
906952 return
953+ finally :
954+ ks_tm .cancel ()
955+
907956 max_mem_percent : float = 0.0
908957 mem_percent : float = 0.0
909958 with open (stats_file_name ) as stats :
@@ -938,7 +987,7 @@ def _job_popen(
938987 job_script_contents : Optional [str ] = None ,
939988 timelimit : Optional [int ] = None ,
940989 name : Optional [str ] = None ,
941- monitor_function : Optional [Callable [["subprocess.Popen[str]" ], None ]] = None ,
990+ monitor_function : Optional [Callable [["subprocess.Popen[str]" , "threading.Event" ], None ]] = None ,
942991 default_stdout : Optional [Union [IO [bytes ], TextIO ]] = None ,
943992 default_stderr : Optional [Union [IO [bytes ], TextIO ]] = None ,
944993) -> int :
@@ -993,7 +1042,7 @@ def terminate(): # type: () -> None
9931042 tm .daemon = True
9941043 tm .start ()
9951044 if monitor_function :
996- monitor_function (sproc )
1045+ monitor_function (sproc , kill_switch )
9971046 rcode = sproc .wait ()
9981047
9991048 if tm is not None :
@@ -1069,7 +1118,7 @@ def terminate(): # type: () -> None
10691118 tm .daemon = True
10701119 tm .start ()
10711120 if monitor_function :
1072- monitor_function (sproc )
1121+ monitor_function (sproc , kill_switch )
10731122
10741123 rcode = sproc .wait ()
10751124
0 commit comments