"""Streaming support for large file processing.
This module enables processing of files larger than available memory
by streaming chunks as they are parsed rather than loading everything.
Classes:
StreamingChunker: Process large files using memory-mapped I/O.
FileMetadata: Metadata about a processed file.
Functions:
chunk_file_streaming: Stream chunks from a file.
compute_file_hash: Compute SHA256 hash of a file.
get_file_metadata: Get metadata about a file.
"""
from __future__ import annotations
import hashlib
import mmap
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING
from .core import _build_retrieval_metadata, _extract_definition_name
from .parser import get_parser
from .types import CodeChunk, compute_node_id
if TYPE_CHECKING:
from collections.abc import Iterator
from tree_sitter import Node
@dataclass
class FileMetadata:
path: str
size: int
hash: str
mtime: float
def compute_file_hash(file_path: Path | str, chunk_size: int = 8192) -> str:
"""Compute SHA256 hash of a file.
Args:
file_path: Path to the file
chunk_size: Size of chunks to read (default: 8192)
"""
file_path = Path(file_path)
hash_obj = hashlib.sha256()
with file_path.open("rb") as f:
for chunk in iter(lambda: f.read(chunk_size), b""):
hash_obj.update(chunk)
return hash_obj.hexdigest()
def get_file_metadata(file_path: Path | str) -> FileMetadata:
"""Get metadata about a file."""
file_path = Path(file_path)
stat = file_path.stat()
return FileMetadata(
path=str(file_path),
size=stat.st_size,
hash=compute_file_hash(file_path),
mtime=stat.st_mtime,
)
class StreamingChunker:
"""Process large files using memory-mapped I/O for efficient streaming."""
def __init__(self, language: str):
self.language = language
self.parser = get_parser(language)
def _walk_streaming(
self,
node: Node,
mmap_data: mmap.mmap,
file_path: str,
parent_ctx: str | None = None,
parent_chunk: CodeChunk | None = None,
parent_route: list[str] | None = None,
parent_qualified_route: list[str] | None = None,
include_retrieval_metadata: bool = False,
) -> Iterator[CodeChunk]:
"""
Yield chunks as they're found without building full list in memory.
"""
chunk_types = {
"function_definition",
"class_definition",
"method_definition",
}
parent_route = (parent_route or []).copy()
parent_qualified_route = (parent_qualified_route or []).copy()
if node.type in chunk_types:
# Extract content from memory-mapped data
text = mmap_data[node.start_byte : node.end_byte].decode(
"utf-8",
errors="replace",
)
current_route = [*parent_route, node.type]
start_line = node.start_point[0] + 1
def_name = _extract_definition_name(node, mmap_data)
if def_name:
qualified_name = f"{node.type}:{def_name}"
else:
qualified_name = f"{node.type}:anon@{start_line}"
current_qualified_route = [*parent_qualified_route, qualified_name]
chunk = CodeChunk(
language=self.language,
file_path=file_path,
node_type=node.type,
start_line=start_line,
end_line=node.end_point[0] + 1,
byte_start=node.start_byte,
byte_end=node.end_byte,
parent_context=parent_ctx or "",
content=text,
parent_chunk_id=(parent_chunk.node_id if parent_chunk else None),
parent_route=current_route,
qualified_route=current_qualified_route,
)
# Ensure node_id reflects file path
chunk.node_id = compute_node_id(
file_path,
chunk.language,
chunk.parent_route,
chunk.content,
)
if include_retrieval_metadata:
chunk.metadata = _build_retrieval_metadata(chunk)
yield chunk
parent_ctx = node.type
parent_chunk = chunk
parent_route = current_route
parent_qualified_route = current_qualified_route
for child in node.children:
yield from self._walk_streaming(
child,
mmap_data,
file_path,
parent_ctx,
parent_chunk,
parent_route,
parent_qualified_route,
include_retrieval_metadata,
)
def chunk_file_streaming(
self,
path: Path,
include_retrieval_metadata: bool = False,
) -> Iterator[CodeChunk]:
"""Stream chunks from a file using memory-mapped I/O."""
# Check if file is empty
if path.stat().st_size == 0:
return
with (
Path(path).open("rb") as f,
mmap.mmap(
f.fileno(),
0,
access=mmap.ACCESS_READ,
) as mmap_data,
):
tree = self.parser.parse(mmap_data)
root = tree.root_node
yield from self._walk_streaming(
root,
mmap_data,
str(path),
include_retrieval_metadata=include_retrieval_metadata,
)
[docs]
def chunk_file_streaming(
path: str | Path,
language: str,
include_retrieval_metadata: bool = False,
) -> Iterator[CodeChunk]:
"""Stream chunks from a file without loading everything into memory."""
chunker = StreamingChunker(language)
yield from chunker.chunk_file_streaming(
Path(path),
include_retrieval_metadata=include_retrieval_metadata,
)