Module pysimt.utils.resource_mgr
Expand source code
import os
import sys
import signal
import atexit
import pathlib
import tempfile
import traceback
class ResourceManager:
"""A utility class to manage process and file related resources.
An instance of this class is callable and when called, it first removes all
previously registered temporary files and then kills the registered processes.
"""
def __init__(self):
self.temp_files = set()
self.processes = set()
self._tmp_folder = pathlib.Path(os.environ.get('PYSIMT_TMP', '/tmp'))
if not self._tmp_folder.exists():
self._tmp_folder.mkdir(parents=True, exist_ok=True)
def register_tmp_file(self, tmp_file: str):
"""Add new temporary file to global set."""
self.temp_files.add(pathlib.Path(tmp_file))
def register_proc(self, pid: int):
"""Add new process to global set."""
self.processes.add(pid)
def unregister_proc(self, pid: int):
"""Remove given PID from global set."""
self.processes.remove(pid)
def get_temp_file(self, delete: bool = False, close: bool = False):
"""Creates a new temporary file under $PYSIMT_TMP folder."""
prefix = str(self._tmp_folder / f"pysimt_{os.getpid()}")
t = tempfile.NamedTemporaryFile(
mode='w', prefix=prefix, delete=delete)
self.register_tmp_file(t.name)
if close:
t.close()
return t
def __call__(self):
"""Cleanup registered temp files and kill PIDs."""
for tmp_file in filter(lambda x: x.exists(), self.temp_files):
tmp_file.unlink()
for proc in self.processes:
try:
os.kill(proc, signal.SIGTERM)
except ProcessLookupError:
pass
def __repr__(self):
repr_ = "Resource Manager\n"
if len(self.processes) > 0:
repr_ += "Tracking Processes\n"
for proc in self.processes:
repr_ += " {}\n".format(proc)
if len(self.temp_files) > 0:
repr_ += "Tracking Temporary Files\n"
for tmp_file in self.temp_files:
repr_ += " {}\n".format(tmp_file)
return repr_
@staticmethod
def _register_exception_handler(logger, quit_on_exception=False):
"""Setup exception handler."""
def exception_handler(exctype, val, trace):
"""Let Python call this when an exception is uncaught."""
logger.info(
''.join(traceback.format_exception(exctype, val, trace)))
def exception_handler_quits(exctype, val, trace):
"""Let Python call this when an exception is uncaught."""
logger.info(
''.join(traceback.format_exception(exctype, val, trace)))
sys.exit(1)
if quit_on_exception:
sys.excepthook = exception_handler_quits
else:
sys.excepthook = exception_handler
@staticmethod
def register_handler(logger):
"""Register atexit and signal handlers."""
# Register exit handler
atexit.register(res_mgr)
# Register SIGINT and SIGTERM
signal.signal(signal.SIGINT, _signal_handler)
signal.signal(signal.SIGTERM, _signal_handler)
ResourceManager._register_exception_handler(logger)
# Create a global cleaner
res_mgr = ResourceManager()
def _signal_handler(signum, frame):
"""Let Python call this when SIGINT or SIGTERM caught."""
res_mgr()
sys.exit(0)
Classes
class ResourceManager
-
A utility class to manage process and file related resources.
An instance of this class is callable and when called, it first removes all previously registered temporary files and then kills the registered processes.
Expand source code
class ResourceManager: """A utility class to manage process and file related resources. An instance of this class is callable and when called, it first removes all previously registered temporary files and then kills the registered processes. """ def __init__(self): self.temp_files = set() self.processes = set() self._tmp_folder = pathlib.Path(os.environ.get('PYSIMT_TMP', '/tmp')) if not self._tmp_folder.exists(): self._tmp_folder.mkdir(parents=True, exist_ok=True) def register_tmp_file(self, tmp_file: str): """Add new temporary file to global set.""" self.temp_files.add(pathlib.Path(tmp_file)) def register_proc(self, pid: int): """Add new process to global set.""" self.processes.add(pid) def unregister_proc(self, pid: int): """Remove given PID from global set.""" self.processes.remove(pid) def get_temp_file(self, delete: bool = False, close: bool = False): """Creates a new temporary file under $PYSIMT_TMP folder.""" prefix = str(self._tmp_folder / f"pysimt_{os.getpid()}") t = tempfile.NamedTemporaryFile( mode='w', prefix=prefix, delete=delete) self.register_tmp_file(t.name) if close: t.close() return t def __call__(self): """Cleanup registered temp files and kill PIDs.""" for tmp_file in filter(lambda x: x.exists(), self.temp_files): tmp_file.unlink() for proc in self.processes: try: os.kill(proc, signal.SIGTERM) except ProcessLookupError: pass def __repr__(self): repr_ = "Resource Manager\n" if len(self.processes) > 0: repr_ += "Tracking Processes\n" for proc in self.processes: repr_ += " {}\n".format(proc) if len(self.temp_files) > 0: repr_ += "Tracking Temporary Files\n" for tmp_file in self.temp_files: repr_ += " {}\n".format(tmp_file) return repr_ @staticmethod def _register_exception_handler(logger, quit_on_exception=False): """Setup exception handler.""" def exception_handler(exctype, val, trace): """Let Python call this when an exception is uncaught.""" logger.info( ''.join(traceback.format_exception(exctype, val, trace))) def exception_handler_quits(exctype, val, trace): """Let Python call this when an exception is uncaught.""" logger.info( ''.join(traceback.format_exception(exctype, val, trace))) sys.exit(1) if quit_on_exception: sys.excepthook = exception_handler_quits else: sys.excepthook = exception_handler @staticmethod def register_handler(logger): """Register atexit and signal handlers.""" # Register exit handler atexit.register(res_mgr) # Register SIGINT and SIGTERM signal.signal(signal.SIGINT, _signal_handler) signal.signal(signal.SIGTERM, _signal_handler) ResourceManager._register_exception_handler(logger)
Static methods
def register_handler(logger)
-
Register atexit and signal handlers.
Expand source code
@staticmethod def register_handler(logger): """Register atexit and signal handlers.""" # Register exit handler atexit.register(res_mgr) # Register SIGINT and SIGTERM signal.signal(signal.SIGINT, _signal_handler) signal.signal(signal.SIGTERM, _signal_handler) ResourceManager._register_exception_handler(logger)
Methods
def get_temp_file(self, delete: bool = False, close: bool = False)
-
Creates a new temporary file under $PYSIMT_TMP folder.
Expand source code
def get_temp_file(self, delete: bool = False, close: bool = False): """Creates a new temporary file under $PYSIMT_TMP folder.""" prefix = str(self._tmp_folder / f"pysimt_{os.getpid()}") t = tempfile.NamedTemporaryFile( mode='w', prefix=prefix, delete=delete) self.register_tmp_file(t.name) if close: t.close() return t
def register_proc(self, pid: int)
-
Add new process to global set.
Expand source code
def register_proc(self, pid: int): """Add new process to global set.""" self.processes.add(pid)
def register_tmp_file(self, tmp_file: str)
-
Add new temporary file to global set.
Expand source code
def register_tmp_file(self, tmp_file: str): """Add new temporary file to global set.""" self.temp_files.add(pathlib.Path(tmp_file))
def unregister_proc(self, pid: int)
-
Remove given PID from global set.
Expand source code
def unregister_proc(self, pid: int): """Remove given PID from global set.""" self.processes.remove(pid)