Skip to content

Data Adapters

openlithohub.data.base

Abstract base class for dataset adapters.

LithoSample dataclass

A single lithography sample with unified tensor representation.

Source code in src/openlithohub/data/base.py
@dataclass
class LithoSample:
    """A single lithography sample with unified tensor representation."""

    design: torch.Tensor
    mask: torch.Tensor | None = None
    resist: torch.Tensor | None = None
    metadata: dict[str, Any] = field(default_factory=dict)

DatasetAdapter

Bases: ABC

Abstract adapter for lithography datasets.

Subclasses must implement len and getitem to provide unified PyTorch Tensor access regardless of underlying format.

Source code in src/openlithohub/data/base.py
class DatasetAdapter(ABC):
    """Abstract adapter for lithography datasets.

    Subclasses must implement __len__ and __getitem__ to provide
    unified PyTorch Tensor access regardless of underlying format.
    """

    @abstractmethod
    def __len__(self) -> int: ...

    @abstractmethod
    def __getitem__(self, index: int) -> LithoSample: ...

    def __iter__(self) -> Iterator[LithoSample]:
        for i in range(len(self)):
            yield self[i]

    @abstractmethod
    def download(self, root: str) -> None:
        """Download dataset to the specified root directory."""
        ...

download(root) abstractmethod

Download dataset to the specified root directory.

Source code in src/openlithohub/data/base.py
@abstractmethod
def download(self, root: str) -> None:
    """Download dataset to the specified root directory."""
    ...

openlithohub.data.lithobench

LithoBench dataset adapter (.npy format).

LithoBench (NeurIPS'23) organizes data as paired .npy arrays per sample: root/ design/ sample_0000.npy # binary design layout (H, W) sample_0001.npy ... mask/ sample_0000.npy # optimized mask (H, W), may not exist for all samples ... resist/ sample_0000.npy # simulated resist contour (H, W), optional ... metadata.json # optional: per-sample process parameters

Alternatively, a flat layout is supported: root/ sample_0000_design.npy sample_0000_mask.npy sample_0000_resist.npy ...

LithoBenchDataset

Bases: DatasetAdapter

Adapter for the LithoBench dataset (NeurIPS'23, 45nm baseline).

Supports two directory layouts: 1. Subdirectory layout: root/{design,mask,resist}/sample_XXXX.npy 2. Flat layout: root/sample_XXXX_{design,mask,resist}.npy

Parameters:

Name Type Description Default
root str | Path

Path to the dataset directory.

required
split str | None

Optional split name (e.g. 'train', 'test'). If set, looks for root/split/.

None
pixel_nm float

Pixel resolution in nanometers (default 1.0 for LithoBench 45nm node).

1.0
Source code in src/openlithohub/data/lithobench.py
class LithoBenchDataset(DatasetAdapter):
    """Adapter for the LithoBench dataset (NeurIPS'23, 45nm baseline).

    Supports two directory layouts:
    1. Subdirectory layout: root/{design,mask,resist}/sample_XXXX.npy
    2. Flat layout: root/sample_XXXX_{design,mask,resist}.npy

    Args:
        root: Path to the dataset directory.
        split: Optional split name (e.g. 'train', 'test'). If set, looks for root/split/.
        pixel_nm: Pixel resolution in nanometers (default 1.0 for LithoBench 45nm node).
    """

    def __init__(
        self,
        root: str | Path,
        split: str | None = None,
        pixel_nm: float = 1.0,
    ) -> None:
        self.root = Path(root)
        if split:
            self.root = self.root / split
        self.pixel_nm = pixel_nm
        self._index: list[str] = []
        self._layout: str = "unknown"
        self._metadata: dict[str, Any] = {}
        self._build_index()

    def _build_index(self) -> None:
        if not self.root.exists():
            raise FileNotFoundError(f"Dataset root not found: {self.root}")

        design_dir = self.root / "design"
        if design_dir.is_dir():
            self._layout = "subdirectory"
            self._index = sorted(p.stem for p in design_dir.glob("*.npy"))
        else:
            self._layout = "flat"
            seen: set[str] = set()
            for p in self.root.glob("*.npy"):
                m = _FILENAME_RE.match(p.name)
                if m and m.group("kind") == "design":
                    seen.add(m.group("sample_id"))
            self._index = sorted(seen)

        meta_path = self.root / "metadata.json"
        if meta_path.exists():
            with open(meta_path) as f:
                self._metadata = json.load(f)

    def __len__(self) -> int:
        return len(self._index)

    def __getitem__(self, index: int) -> LithoSample:
        if index < 0 or index >= len(self._index):
            raise IndexError(f"Index {index} out of range [0, {len(self._index)})")

        sample_id = self._index[index]
        design = self._load_array(sample_id, "design")
        mask = self._try_load_array(sample_id, "mask")
        resist = self._try_load_array(sample_id, "resist")

        metadata: dict[str, Any] = {
            "dataset": "lithobench",
            "sample_id": sample_id,
            "pixel_nm": self.pixel_nm,
        }
        if sample_id in self._metadata:
            metadata.update(self._metadata[sample_id])

        return LithoSample(
            design=torch.from_numpy(design).float(),
            mask=torch.from_numpy(mask).float() if mask is not None else None,
            resist=torch.from_numpy(resist).float() if resist is not None else None,
            metadata=metadata,
        )

    def _resolve_path(self, sample_id: str, kind: str) -> Path:
        if self._layout == "subdirectory":
            return self.root / kind / f"{sample_id}.npy"
        return self.root / f"{sample_id}_{kind}.npy"

    def _load_array(self, sample_id: str, kind: str) -> np.ndarray:
        path = self._resolve_path(sample_id, kind)
        if not path.exists():
            raise FileNotFoundError(f"Required file not found: {path}")
        return np.load(path)

    def _try_load_array(self, sample_id: str, kind: str) -> np.ndarray | None:
        path = self._resolve_path(sample_id, kind)
        if path.exists():
            return np.load(path)
        return None

    def download(self, root: str) -> None:
        raise NotImplementedError(
            "LithoBench auto-download not yet implemented. "
            "Please download manually from: https://github.com/phdyang007/lithobench"
        )

    @property
    def sample_ids(self) -> list[str]:
        return list(self._index)

openlithohub.data.lithosim

LithoSim dataset adapter (HuggingFace Parquet format).

LithoSim is a sub-28nm industrial lithography simulation dataset hosted on HuggingFace Hub. It stores design/mask/resist image pairs as Parquet rows with image columns and process metadata.

Requires: pip install openlithohub[data] (adds datasets and pyarrow)

LithoSimDataset

Bases: DatasetAdapter

Adapter for the LithoSim dataset (sub-28nm industrial benchmark).

Loads data from HuggingFace Hub using the datasets library. Images are stored as columns in Parquet format and decoded to tensors on access.

Parameters:

Name Type Description Default
split str

Dataset split ('train', 'test', or 'all').

'test'
dataset_name str

HuggingFace dataset identifier. Override for custom forks.

_HF_DATASET_NAME
cache_dir str | None

Local cache directory for downloaded data.

None
pixel_nm float

Pixel resolution in nanometers.

0.5
streaming bool

If True, use streaming mode (no full download).

False
Source code in src/openlithohub/data/lithosim.py
class LithoSimDataset(DatasetAdapter):
    """Adapter for the LithoSim dataset (sub-28nm industrial benchmark).

    Loads data from HuggingFace Hub using the `datasets` library.
    Images are stored as columns in Parquet format and decoded to tensors on access.

    Args:
        split: Dataset split ('train', 'test', or 'all').
        dataset_name: HuggingFace dataset identifier. Override for custom forks.
        cache_dir: Local cache directory for downloaded data.
        pixel_nm: Pixel resolution in nanometers.
        streaming: If True, use streaming mode (no full download).
    """

    def __init__(
        self,
        split: str = "test",
        dataset_name: str = _HF_DATASET_NAME,
        cache_dir: str | None = None,
        pixel_nm: float = 0.5,
        streaming: bool = False,
    ) -> None:
        _ensure_datasets_available()
        self.split = split
        self.dataset_name = dataset_name
        self.cache_dir = cache_dir
        self.pixel_nm = pixel_nm
        self.streaming = streaming
        self._ds: Any = None
        self._len: int | None = None

    def _load_dataset(self) -> Any:
        if self._ds is None:
            from datasets import load_dataset

            self._ds = load_dataset(
                self.dataset_name,
                split=self.split,
                cache_dir=self.cache_dir,
                streaming=self.streaming,
            )
        return self._ds

    def __len__(self) -> int:
        if self._len is not None:
            return self._len
        ds = self._load_dataset()
        self._len = len(ds)
        return self._len

    def __getitem__(self, index: int) -> LithoSample:
        ds = self._load_dataset()

        if index < 0 or index >= len(self):
            raise IndexError(f"Index {index} out of range [0, {len(self)})")

        row = ds[index]
        design = self._decode_image(row, "design")
        mask = self._try_decode_image(row, "mask")
        resist = self._try_decode_image(row, "resist")

        metadata: dict[str, Any] = {
            "dataset": "lithosim",
            "pixel_nm": self.pixel_nm,
            "split": self.split,
        }
        for key in ("process_node", "pitch_nm", "dose", "focus", "sample_id", "feature_type"):
            if key in row:
                metadata[key] = row[key]

        return LithoSample(
            design=design,
            mask=mask,
            resist=resist,
            metadata=metadata,
        )

    def _decode_image(self, row: dict[str, Any], column: str) -> torch.Tensor:
        if column not in row:
            raise KeyError(f"Required column '{column}' not found in dataset row")
        return self._to_tensor(row[column])

    def _try_decode_image(self, row: dict[str, Any], column: str) -> torch.Tensor | None:
        if column not in row or row[column] is None:
            return None
        return self._to_tensor(row[column])

    @staticmethod
    def _to_tensor(value: Any) -> torch.Tensor:
        if isinstance(value, np.ndarray):
            arr = value.astype(np.float32)
            if arr.size > 0 and arr.max() > 1.0:
                arr = arr / 255.0
            return torch.from_numpy(arr)

        if isinstance(value, (list, tuple)):
            return torch.tensor(value, dtype=torch.float32)

        try:
            from PIL import Image
        except ImportError as e:
            raise ImportError(
                "Pillow is required for image decoding. Install with: pip install Pillow"
            ) from e

        if isinstance(value, Image.Image):
            arr = np.array(value, dtype=np.float32)
            if arr.size > 0 and arr.max() > 1.0:
                arr = arr / 255.0
            return torch.from_numpy(arr)

        if isinstance(value, dict) and "bytes" in value:
            import io

            img = Image.open(io.BytesIO(value["bytes"]))
            arr = np.array(img, dtype=np.float32)
            if arr.size > 0 and arr.max() > 1.0:
                arr = arr / 255.0
            return torch.from_numpy(arr)

        raise TypeError(f"Cannot convert {type(value)} to tensor")

    def download(self, root: str) -> None:
        from datasets import load_dataset

        load_dataset(
            self.dataset_name,
            split=self.split,
            cache_dir=root,
        )

    @property
    def columns(self) -> list[str]:
        ds = self._load_dataset()
        return ds.column_names

openlithohub.data.transforms

Data transforms for resolution alignment and normalization.

align_resolution(tensor, source_pixel_nm, target_pixel_nm, mode='bilinear')

Resample a tensor to match target pixel resolution.

Parameters:

Name Type Description Default
tensor Tensor

Input tensor (H, W) or (C, H, W).

required
source_pixel_nm float

Current pixel size in nanometers.

required
target_pixel_nm float

Desired pixel size in nanometers.

required
mode str

Interpolation mode ('bilinear', 'nearest', 'bicubic').

'bilinear'

Returns:

Type Description
Tensor

Resampled tensor at the target resolution.

Source code in src/openlithohub/data/transforms.py
def align_resolution(
    tensor: torch.Tensor,
    source_pixel_nm: float,
    target_pixel_nm: float,
    mode: str = "bilinear",
) -> torch.Tensor:
    """Resample a tensor to match target pixel resolution.

    Args:
        tensor: Input tensor (H, W) or (C, H, W).
        source_pixel_nm: Current pixel size in nanometers.
        target_pixel_nm: Desired pixel size in nanometers.
        mode: Interpolation mode ('bilinear', 'nearest', 'bicubic').

    Returns:
        Resampled tensor at the target resolution.
    """
    if source_pixel_nm <= 0 or target_pixel_nm <= 0:
        raise ValueError("Pixel sizes must be positive")

    scale = source_pixel_nm / target_pixel_nm
    if abs(scale - 1.0) < 1e-6:
        return tensor

    ndim = tensor.ndim
    if ndim == 2:
        x = tensor.unsqueeze(0).unsqueeze(0)
    elif ndim == 3:
        x = tensor.unsqueeze(0)
    else:
        raise ValueError(f"Expected 2D (H,W) or 3D (C,H,W) tensor, got {ndim}D")

    align_corners = None if mode == "nearest" else False
    x = f.interpolate(x, scale_factor=scale, mode=mode, align_corners=align_corners)

    if ndim == 2:
        return x.squeeze(0).squeeze(0)
    return x.squeeze(0)

normalize_to_binary(tensor, threshold=0.5)

Threshold a continuous tensor to binary (0/1).

Source code in src/openlithohub/data/transforms.py
def normalize_to_binary(tensor: torch.Tensor, threshold: float = 0.5) -> torch.Tensor:
    """Threshold a continuous tensor to binary (0/1)."""
    return (tensor > threshold).float()