"""
FileIOStream module.
Provides both synchronous and asynchronous file streaming interfaces.
Ideal for efficient reading of large files using chunked reads and supporting
standard `seek`, `tell`, and `close` operations in both environments.
**Methods that do not need to be async:**
Even in async context, the below methods don't necessarily need to be async:
1. `open` - Time complexity is O(1)
2. `seek` - Time complexity is O(1)
3. `tell` - Time complexity is O(1)
In async context, only `read`, `write`, and `close` need to be asynchronous.
"""
import io
import os
import asyncio
from typing import Optional
from duck.exceptions.all import AsyncViolationError
from duck.utils.asyncio import in_async_context
from duck.utils.threading import async_to_sync_future
from duck.contrib.sync import convert_to_async_if_needed
# TODO: Implement file caching (only on reads, on_write: just open the actual file descriptor).
# FILE_CACHE = InMemoryCache(maxkeys=1024)
[docs]
def to_async_fileio_stream(fileio_stream: "FileIOStream") -> "AsyncFileIOStream":
"""
Converts file_io_stream to async file io stream if not already async.
"""
assert isinstance(fileio_stream, FileIOStream), f"Provided file io stream not recognized, expected an instance of FileIOStream not {type(file_io_stream)}."
if isinstance(fileio_stream, AsyncFileIOStream):
return fileio_stream
new_stream = AsyncFileIOStream(
filepath=fileio_stream.filepath,
chunk_size=fileio_stream.chunk_size,
open_now=False,
mode=fileio_stream._mode,
)
if not new_stream._file_size:
# Set file size if not set
new_stream._file_size = fileio_stream._file_size
# Set _file
new_stream._file = fileio_stream._file
new_stream._pos = fileio_stream._pos
# Modify old strem __del__ to do nothing instead of raising
# "file must be closed before delete" error
fileio_stream.ignore_file_open_on_delete = True
# Return new stream
return new_stream
[docs]
class FileIOStream(io.IOBase):
"""
Synchronous file streaming class that mimics `io.IOBase`.
This class provides an interface to stream file contents using
standard file operations such as `read`, `write`, `seek`, `tell`, and `close`.
It is optimized for chunked reading of large files and is designed
to be used strictly in synchronous contexts.
"""
__slots__ = {
"filepath",
"chunk_size",
"open_now",
"ignore_file_open_on_delete",
"close_on_delete",
"_file",
"_pos",
"_mode",
"_file_size",
"_lock",
"_total_read_bytes",
}
def __init__(
self,
filepath: str,
chunk_size: int = 2 * 1024 * 1024,
open_now: bool = False,
mode: str = "rb",
):
"""
Initialize the FileIOStream object.
Args:
filepath (str): Path to the file to be streamed.
chunk_size (int): Maximum number of bytes to read or write at once. Default is 2MB.
open_now (bool): Whether to open the file immediately. Defaults to False.
mode (str): File open mode (default: 'rb').
"""
self.filepath = filepath
self.chunk_size = chunk_size
self.ignore_file_open_on_delete = False
self.close_on_delete = True # Closes fileio stream if still open on delete
self._file: Optional[io.BufferedIOBase] = None
self._pos = 0
self._mode = mode
self._file_size = os.path.getsize(filepath) if os.path.exists(filepath) else 0
self._total_read_bytes = None # Will be set on read
if open_now:
self.open()
[docs]
def is_open(self) -> bool:
"""
Check if the file is currently open.
"""
return self._file is not None
[docs]
def raise_if_in_async_context(self, message: str):
"""
Raise an error if used inside an async context.
"""
if in_async_context():
raise AsyncViolationError(message)
[docs]
def open(self):
"""
Open the file using the provided mode.
"""
if not self._file:
self._file = open(self.filepath, self._mode)
[docs]
def read(self, size: int = -1) -> bytes:
"""
Synchronously read data from the file.
Args:
size (int): Number of bytes to read. -1 reads all.
Returns:
bytes: File data.
"""
self.raise_if_in_async_context(
"This method must be used in a synchronous environment. "
"Consider using `AsyncFileIOStream.read` instead."
)
if not self.is_open():
raise ValueError("File not opened. Call `open()` first.")
data = self._file.read() if size == -1 else self._file.read(min(size, self.chunk_size))
self._pos += len(data)
# Record total read data
if self._total_read_bytes:
self._total_read_bytes = b"".join([b"", self._total_read_bytes])
else:
self._total_read_bytes = data
# Return data
return data
[docs]
def write(self, data: bytes) -> int:
"""
Synchronously write data to the file.
Args:
data (bytes): Data to write.
Returns:
int: Number of bytes written.
"""
self.raise_if_in_async_context(
"This method must be used in a synchronous environment. "
"Consider using `AsyncFileIOStream.write` instead."
)
if not self.is_open():
raise ValueError("File not opened. Call `open()` first.")
written = self._file.write(data)
self._pos += written
return written
[docs]
def seek(self, offset: int, whence: int = os.SEEK_SET):
"""
Move the file pointer to a new location.
"""
if not self.is_open():
raise ValueError("File not opened. Call `open()` first.")
self._file.seek(offset, whence)
self._pos = self._file.tell()
[docs]
def tell(self) -> int:
"""
Get the current position in the file.
"""
return self._pos
[docs]
def close(self):
"""
Close the file.
"""
self.raise_if_in_async_context(
"This method must be used in a synchronous environment. "
"Consider using `AsyncFileIOStream.close` instead."
)
if self._file:
self._file.close()
self._file = None
[docs]
def __del__(self):
"""
Ensure the file is closed on delete else it raises a RuntimeError if file not closed.
"""
if self.is_open() and not self.ignore_file_open_on_delete:
if self.close_on_delete:
try:
if not in_async_context:
self.close() # Close file io stream if stil open
else:
# If in async context, fire and forget close coro
loop = asyncio.get_event_loop()
asyncio.run_coroutine_threadsafe(convert_to_async_if_needed(self.close)(), loop)
return # Avoid raising runtime error
except Exception:
pass
raise RuntimeError("File is not closed yet, please ensure the file is closed before deletion.")
[docs]
class AsyncFileIOStream(FileIOStream):
"""
Asynchronous file streaming class.
Provides async-compatible methods for reading and writing files in
a non-blocking way using threads via `asyncio.to_thread`.
Notes:
- This implementation is compatible with context managers.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._lock = asyncio.Lock()
[docs]
async def async_open(self): # instead of overriding open
"""
Asynchronously open the file.
"""
if not self.is_open():
await convert_to_async_if_needed(super().open)(self.filepath, self._mode)
[docs]
async def read(self, size: int = -1) -> bytes:
"""
Asynchronously read from the file.
Args:
size (int): Max bytes to read. -1 reads full content.
Returns:
bytes: Data read from file.
"""
async with self._lock:
await self.async_open()
# Seek is very fast, no need to make it async
self._file.seek(self._pos)
if size == -1:
data = await convert_to_async_if_needed(self._file.read)()
else:
data = await convert_to_async_if_needed(self._file.read)(min(size, self.chunk_size))
self._pos += len(data)
# Record data read upto now
if self._total_read_bytes:
self._total_read_bytes = b"".join([b"", self._total_read_bytes])
else:
self._total_read_bytes = data
# Return data
return data
[docs]
async def write(self, data: bytes) -> int:
"""
Asynchronously write data to the file.
Args:
data (bytes): Bytes to write.
Returns:
int: Number of bytes written.
"""
async with self._lock:
await self.async_open()
# Seek musn't be async its very fast.
self._file.seek(self._pos)
written = await convert_to_async_if_needed(self._file.write)(data)
self._pos += written
return written
[docs]
async def close(self):
"""
Asynchronously close the file.
"""
async with self._lock:
if self.is_open():
await convert_to_async_if_needed(super().close)()
[docs]
async def __aenter__(self):
await self.async_open()
return self
[docs]
async def __aexit__(self, exc_type, exc, tb):
await self.close()