Source code for duck.utils.filelock
"""
File lock capabilities module.
"""
import os
import time
import platform
if platform.system() == "Windows":
import msvcrt
else:
try:
import fcntl
except ImportError:
fcntl = None
[docs]
def lock_file(file_descriptor, retries=5, wait=1):
"""
Lock file when modifying or accessing critical sections to avoid race conditions.
"""
if platform.system() == "Windows":
for _ in range(retries):
try:
msvcrt.locking(
file_descriptor.fileno(),
msvcrt.LK_NBLCK,
os.path.getsize(file_descriptor.name),
)
return
except OSError:
time.sleep(wait)
raise BlockingIOError(
f"Could not acquire lock after {retries} retries")
else:
if not fcntl:
return
for _ in range(retries):
try:
fcntl.flock(file_descriptor, fcntl.LOCK_EX | fcntl.LOCK_NB)
return
except BlockingIOError:
time.sleep(wait)
raise BlockingIOError(
f"Could not acquire lock after {retries} retries")
[docs]
def unlock_file(file_descriptor):
"""
Unlock a locked file.
"""
if platform.system() == "Windows":
msvcrt.locking(
file_descriptor.fileno(),
msvcrt.LK_UNLCK,
os.path.getsize(file_descriptor.name),
)
else:
fcntl.flock(file_descriptor, fcntl.LOCK_UN)
[docs]
def open_and_lock(filename, mode="r+"):
"""
Opens a file and acquires an exclusive lock.
Args:
filename: The name of the file to open.
mode: The file open mode (default is 'r+').
Returns:
The opened file object.
"""
fd = open(filename, mode)
lock_file(fd)
return fd