Skip to content

Workflow Engine

openlithohub.workflow.parsing

Layout file parsing via KLayout Python API.

parse_layout(path)

Parse an OASIS or GDSII layout file.

Returns dictionary with 'cells', 'layers', 'bounding_box', and '_layout' handle.

Source code in src/openlithohub/workflow/parsing.py
def parse_layout(path: str | Path) -> dict[str, Any]:
    """Parse an OASIS or GDSII layout file.

    Returns dictionary with 'cells', 'layers', 'bounding_box', and '_layout' handle.
    """
    path = Path(path)
    if not path.exists():
        raise FileNotFoundError(f"Layout file not found: {path}")

    suffix = path.suffix.lower()
    if suffix not in (".oas", ".gds", ".gds2", ".oasis"):
        raise ValueError(f"Unsupported layout format: {suffix}. Use .oas or .gds")

    try:
        import klayout.db as db
    except ImportError:
        raise ImportError(
            "klayout is required for layout parsing. "
            "Install with: pip install openlithohub[workflow]"
        ) from None

    layout = db.Layout()
    layout.read(str(path))

    cells: list[dict[str, Any]] = []
    for cell_idx in range(layout.cells()):
        cell = layout.cell(cell_idx)
        cells.append(
            {
                "name": cell.name,
                "index": cell_idx,
                "instance_count": cell.child_instances(),
            }
        )

    layers: list[dict[str, Any]] = []
    for layer_idx in layout.layer_indices():
        info = layout.get_info(layer_idx)
        layers.append(
            {
                "layer": info.layer,
                "datatype": info.datatype,
                "name": info.name if info.name else f"{info.layer}/{info.datatype}",
            }
        )

    top_cell = layout.top_cell()
    bbox = top_cell.bbox()
    bounding_box = {
        "x_min": bbox.left,
        "y_min": bbox.bottom,
        "x_max": bbox.right,
        "y_max": bbox.top,
        "dbu": layout.dbu,
    }

    return {
        "cells": cells,
        "layers": layers,
        "bounding_box": bounding_box,
        "_layout": layout,
    }

openlithohub.workflow.tiling

Full-chip tiling strategy for distributed processing.

Tile dataclass

A single tile from a layout partition.

Source code in src/openlithohub/workflow/tiling.py
@dataclass
class Tile:
    """A single tile from a layout partition."""

    tensor: torch.Tensor
    origin_x: int
    origin_y: int
    width: int
    height: int
    overlap: int

tile_layout(layout_tensor, tile_size=2048, overlap=128)

Partition a full-chip layout tensor into overlapping tiles.

Uses a sliding window with configurable overlap. Boundary tiles are zero-padded to maintain uniform tile dimensions.

Parameters:

Name Type Description Default
layout_tensor Tensor

Full layout as tensor (H, W).

required
tile_size int

Size of each square tile in pixels.

2048
overlap int

Overlap between adjacent tiles for seamless stitching.

128

Returns:

Type Description
list[Tile]

List of Tile objects covering the full layout.

Raises:

Type Description
ValueError

If overlap >= tile_size or tile_size <= 0.

Source code in src/openlithohub/workflow/tiling.py
def tile_layout(
    layout_tensor: torch.Tensor,
    tile_size: int = 2048,
    overlap: int = 128,
) -> list[Tile]:
    """Partition a full-chip layout tensor into overlapping tiles.

    Uses a sliding window with configurable overlap. Boundary tiles are
    zero-padded to maintain uniform tile dimensions.

    Args:
        layout_tensor: Full layout as tensor (H, W).
        tile_size: Size of each square tile in pixels.
        overlap: Overlap between adjacent tiles for seamless stitching.

    Returns:
        List of Tile objects covering the full layout.

    Raises:
        ValueError: If overlap >= tile_size or tile_size <= 0.
    """
    if tile_size <= 0:
        raise ValueError(f"tile_size must be positive, got {tile_size}")
    if overlap < 0:
        raise ValueError(f"overlap must be non-negative, got {overlap}")
    if overlap >= tile_size:
        raise ValueError(f"overlap ({overlap}) must be less than tile_size ({tile_size})")

    h, w = layout_tensor.shape[-2], layout_tensor.shape[-1]
    step = tile_size - overlap
    tiles: list[Tile] = []

    y = 0
    while y < h:
        x = 0
        while x < w:
            y_end = min(y + tile_size, h)
            x_end = min(x + tile_size, w)
            tile_data = layout_tensor[..., y:y_end, x:x_end]

            actual_h = y_end - y
            actual_w = x_end - x

            if actual_h < tile_size or actual_w < tile_size:
                pad_bottom = tile_size - actual_h
                pad_right = tile_size - actual_w
                tile_data = functional.pad(tile_data, (0, pad_right, 0, pad_bottom), value=0.0)

            tiles.append(
                Tile(
                    tensor=tile_data,
                    origin_x=x,
                    origin_y=y,
                    width=actual_w,
                    height=actual_h,
                    overlap=overlap,
                )
            )

            x += step
        y += step

    return tiles

stitch_tiles(tiles, output_shape)

Reassemble optimized tiles into a full-chip tensor.

Uses linear blending in overlap regions to avoid seam artifacts.

Parameters:

Name Type Description Default
tiles list[tuple[Tile, Tensor]]

List of (original_tile, optimized_tensor) pairs.

required
output_shape tuple[int, int]

(H, W) of the full output tensor.

required

Returns:

Type Description
Tensor

Stitched tensor of shape output_shape.

Source code in src/openlithohub/workflow/tiling.py
def stitch_tiles(
    tiles: list[tuple[Tile, torch.Tensor]],
    output_shape: tuple[int, int],
) -> torch.Tensor:
    """Reassemble optimized tiles into a full-chip tensor.

    Uses linear blending in overlap regions to avoid seam artifacts.

    Args:
        tiles: List of (original_tile, optimized_tensor) pairs.
        output_shape: (H, W) of the full output tensor.

    Returns:
        Stitched tensor of shape output_shape.
    """
    h, w = output_shape
    output = torch.zeros(h, w)
    weight_map = torch.zeros(h, w)

    for tile, result in tiles:
        tile_h = tile.height
        tile_w = tile.width
        result_2d = result[..., :tile_h, :tile_w]
        if result_2d.ndim > 2:
            result_2d = result_2d.squeeze()

        blend = torch.ones(tile_h, tile_w)

        if tile.overlap > 0:
            ramp = torch.linspace(0.0, 1.0, tile.overlap)

            if tile.origin_x > 0:
                left_ramp = ramp.unsqueeze(0).expand(tile_h, -1)
                ol = min(tile.overlap, tile_w)
                blend[:, :ol] *= left_ramp[:, :ol]

            if tile.origin_y > 0:
                top_ramp = ramp.unsqueeze(1).expand(-1, tile_w)
                ol = min(tile.overlap, tile_h)
                blend[:ol, :] *= top_ramp[:ol, :]

        y_end = tile.origin_y + tile_h
        x_end = tile.origin_x + tile_w
        output[tile.origin_y : y_end, tile.origin_x : x_end] += result_2d * blend
        weight_map[tile.origin_y : y_end, tile.origin_x : x_end] += blend

    nonzero = weight_map > 0
    output[nonzero] /= weight_map[nonzero]
    return output

openlithohub.workflow.contour.manhattan

Manhattan (staircase) contour extraction for traditional VSB writers.

extract_manhattan_contour(mask, pixel_size_nm=1.0)

Extract Manhattan (rectilinear) polygon contours from a binary mask.

Traces pixel-edge boundaries between foreground and background regions, producing axis-aligned polygons suitable for VSB mask writers.

The algorithm: 1. Find horizontal and vertical boundary edges between 0/1 pixels 2. Build an adjacency graph of edges sharing vertices 3. Trace closed loops through the edge graph

Vertices are at pixel corners (not pixel centers), scaled by pixel_size_nm.

Parameters:

Name Type Description Default
mask Tensor

Binary mask tensor (H, W).

required
pixel_size_nm float

Physical pixel size for coordinate scaling.

1.0

Returns:

Type Description
list[list[tuple[float, float]]]

List of polygons, each as a list of (x_nm, y_nm) vertices in order.

list[list[tuple[float, float]]]

Outer boundaries are clockwise, holes are counter-clockwise.

Source code in src/openlithohub/workflow/contour/manhattan.py
def extract_manhattan_contour(
    mask: torch.Tensor,
    pixel_size_nm: float = 1.0,
) -> list[list[tuple[float, float]]]:
    """Extract Manhattan (rectilinear) polygon contours from a binary mask.

    Traces pixel-edge boundaries between foreground and background regions,
    producing axis-aligned polygons suitable for VSB mask writers.

    The algorithm:
    1. Find horizontal and vertical boundary edges between 0/1 pixels
    2. Build an adjacency graph of edges sharing vertices
    3. Trace closed loops through the edge graph

    Vertices are at pixel corners (not pixel centers), scaled by pixel_size_nm.

    Args:
        mask: Binary mask tensor (H, W).
        pixel_size_nm: Physical pixel size for coordinate scaling.

    Returns:
        List of polygons, each as a list of (x_nm, y_nm) vertices in order.
        Outer boundaries are clockwise, holes are counter-clockwise.
    """
    m = ensure_2d(mask)
    arr = (m > 0.5).detach().cpu().numpy().astype(np.int8)
    h, w = arr.shape

    # Pad with zeros to ensure boundaries are closed at image edges
    padded = np.pad(arr, 1, mode="constant", constant_values=0)

    # Find horizontal edges: between row i and row i+1
    # An edge exists where padded[i, j] != padded[i+1, j]
    h_edges: set[tuple[int, int, int, int]] = set()
    v_edges: set[tuple[int, int, int, int]] = set()

    ph, pw = padded.shape

    # Horizontal edges: edge from (j, i) to (j+1, i) in vertex space
    # A horizontal edge at grid row i exists between pixel rows i-1 and i
    for i in range(ph - 1):
        for j in range(pw - 1):
            if padded[i, j] != padded[i + 1, j]:
                h_edges.add((j, i + 1, j + 1, i + 1))  # (x1, y1, x2, y2) left to right

    # Vertical edges: edge from (j, i) to (j, i+1) in vertex space
    for i in range(ph - 1):
        for j in range(pw - 1):
            if padded[i, j] != padded[i, j + 1]:
                v_edges.add((j + 1, i, j + 1, i + 1))  # top to bottom

    all_edges = h_edges | v_edges
    if not all_edges:
        return []

    # Build adjacency: vertex -> list of connected edges
    vertex_to_edges: dict[tuple[int, int], list[tuple[int, int, int, int]]] = {}
    for edge in all_edges:
        x1, y1, x2, y2 = edge
        vertex_to_edges.setdefault((x1, y1), []).append(edge)
        vertex_to_edges.setdefault((x2, y2), []).append(edge)

    # Trace closed loops
    remaining = set(all_edges)
    polygons: list[list[tuple[float, float]]] = []

    while remaining:
        start_edge = next(iter(remaining))
        polygon_vertices: list[tuple[int, int]] = []

        x1, y1, x2, y2 = start_edge
        polygon_vertices.append((x1, y1))
        current_vertex = (x2, y2)
        remaining.discard(start_edge)

        while current_vertex != (x1, y1):
            polygon_vertices.append(current_vertex)
            candidates = vertex_to_edges.get(current_vertex, [])
            next_edge = None
            for e in candidates:
                if e in remaining:
                    next_edge = e
                    break

            if next_edge is None:
                break

            remaining.discard(next_edge)
            ex1, ey1, ex2, ey2 = next_edge
            current_vertex = (ex2, ey2) if (ex1, ey1) == current_vertex else (ex1, ey1)

        # Convert to physical coordinates (subtract 1 for padding offset)
        scaled: list[tuple[float, float]] = []
        for vx, vy in polygon_vertices:
            scaled.append(((vx - 1) * pixel_size_nm, (vy - 1) * pixel_size_nm))

        if len(scaled) >= 3:
            polygons.append(_simplify_collinear(scaled))

    return polygons

openlithohub.workflow.contour.curvilinear

Curvilinear contour extraction and B-spline fitting for OASIS.MBW export.

BSplineCurve dataclass

Representation of a fitted B-spline curve.

Source code in src/openlithohub/workflow/contour/curvilinear.py
@dataclass
class BSplineCurve:
    """Representation of a fitted B-spline curve."""

    control_points: torch.Tensor
    knots: torch.Tensor
    degree: int = 3

fit_bspline(contour_pixels, tolerance_nm=0.5, pixel_size_nm=1.0)

Fit B-spline curves to pixel-level contour data.

If input is a 2D binary mask, extracts boundary contours first. If input is an (N, 2) tensor, treats it as a single ordered point loop.

Source code in src/openlithohub/workflow/contour/curvilinear.py
def fit_bspline(
    contour_pixels: torch.Tensor,
    tolerance_nm: float = 0.5,
    pixel_size_nm: float = 1.0,
) -> list[BSplineCurve]:
    """Fit B-spline curves to pixel-level contour data.

    If input is a 2D binary mask, extracts boundary contours first.
    If input is an (N, 2) tensor, treats it as a single ordered point loop.
    """
    try:
        from scipy.interpolate import splprep
    except ImportError:
        raise ImportError(
            "scipy is required for B-spline fitting. "
            "Install with: pip install openlithohub[workflow]"
        ) from None

    if contour_pixels.ndim == 2 and contour_pixels.shape[1] == 2:
        loops = [contour_pixels.detach().cpu().numpy()]
    else:
        m = ensure_2d(contour_pixels)
        arr = (m > 0.5).detach().cpu().numpy().astype(np.int8)
        loops = _trace_contour(arr)

    smoothing = tolerance_nm / max(pixel_size_nm, 1e-6)
    curves: list[BSplineCurve] = []

    for loop in loops:
        if len(loop) < 5:
            continue

        loop_scaled = loop * pixel_size_nm

        try:
            tck, _ = splprep(
                [loop_scaled[:, 1], loop_scaled[:, 0]],
                s=smoothing * len(loop),
                per=True,
                k=3,
            )
        except (ValueError, TypeError):
            continue

        ctrl_x = np.array(tck[1][0], dtype=np.float32)
        ctrl_y = np.array(tck[1][1], dtype=np.float32)
        control_points = torch.tensor(np.stack([ctrl_x, ctrl_y], axis=1), dtype=torch.float32)
        knots = torch.tensor(tck[0], dtype=torch.float32)

        curves.append(BSplineCurve(control_points=control_points, knots=knots, degree=3))

    return curves

export_oasis_mbw(curves, output_path, *, format_version='2.1', samples_per_curve=64)

Serialize B-spline curves to OASIS.MBW format for multi-beam writers.

MVP implementation: samples curves to high-resolution polygons and writes a simplified OASIS-compatible binary file with polygon records. Native curve primitives per SEMI P44 are planned for a future release.

Source code in src/openlithohub/workflow/contour/curvilinear.py
def export_oasis_mbw(
    curves: list[BSplineCurve],
    output_path: str,
    *,
    format_version: str = "2.1",
    samples_per_curve: int = 64,
) -> None:
    """Serialize B-spline curves to OASIS.MBW format for multi-beam writers.

    MVP implementation: samples curves to high-resolution polygons and writes
    a simplified OASIS-compatible binary file with polygon records.
    Native curve primitives per SEMI P44 are planned for a future release.
    """
    try:
        from scipy.interpolate import splev
    except ImportError:
        raise ImportError(
            "scipy is required for OASIS.MBW export. "
            "Install with: pip install openlithohub[workflow]"
        ) from None

    output = Path(output_path)
    output.parent.mkdir(parents=True, exist_ok=True)

    sampled_polygons: list[list[tuple[float, float]]] = []
    for curve in curves:
        ctrl = curve.control_points.numpy()
        knot = curve.knots.numpy()
        tck = (knot, [ctrl[:, 0], ctrl[:, 1]], curve.degree)
        u_eval = np.linspace(0.0, 1.0, samples_per_curve, endpoint=False)
        xs, ys = splev(u_eval, tck)
        polygon = [(float(x), float(y)) for x, y in zip(xs, ys, strict=False)]
        sampled_polygons.append(polygon)

    _write_oasis_binary(sampled_polygons, output, format_version)

openlithohub.workflow.export

OASIS/GDSII export coordination.

export_oasis(mask, output_path, *, mode='curvilinear', pixel_size_nm=1.0)

Export an optimized mask tensor to OASIS format.

For manhattan mode, extracts rectilinear contours and writes via KLayout. For curvilinear mode, fits B-splines and writes OASIS.MBW format.

Source code in src/openlithohub/workflow/export.py
def export_oasis(
    mask: torch.Tensor,
    output_path: str | Path,
    *,
    mode: str = "curvilinear",
    pixel_size_nm: float = 1.0,
) -> None:
    """Export an optimized mask tensor to OASIS format.

    For manhattan mode, extracts rectilinear contours and writes via KLayout.
    For curvilinear mode, fits B-splines and writes OASIS.MBW format.
    """
    if mode not in ("manhattan", "curvilinear"):
        raise ValueError(f"mode must be 'manhattan' or 'curvilinear', got '{mode}'")

    m = ensure_2d(mask)
    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)

    if mode == "manhattan":
        _export_manhattan(m, output_path, pixel_size_nm)
    else:
        _export_curvilinear(m, output_path, pixel_size_nm)