diff --git a/machine-learning/Dockerfile b/machine-learning/Dockerfile index 6c976d4612..a96559a57c 100644 --- a/machine-learning/Dockerfile +++ b/machine-learning/Dockerfile @@ -129,12 +129,13 @@ COPY --from=builder-armnn \ FROM prod-cpu AS prod-rknn # renovate: datasource=github-tags depName=airockchip/rknn-toolkit2 -ARG RKNN_TOOLKIT_VERSION="v2.3.0" +ARG RKNN_TOOLKIT_VERSION="v2.3.2" ENV LD_PRELOAD=/usr/lib/libmimalloc.so.2 \ MACHINE_LEARNING_MODEL_ARENA=false -ADD --checksum=sha256:73993ed4b440460825f21611731564503cc1d5a0c123746477da6cd574f34885 "https://github.com/airockchip/rknn-toolkit2/raw/refs/tags/${RKNN_TOOLKIT_VERSION}/rknpu2/runtime/Linux/librknn_api/aarch64/librknnrt.so" /usr/lib/ +ADD --checksum=sha256:d31fc19c85b85f6091b2bd0f6af9d962d5264a4e410bfb536402ec92bac738e8 "https://github.com/airockchip/rknn-toolkit2/raw/refs/tags/${RKNN_TOOLKIT_VERSION}/rknpu2/runtime/Linux/librknn_api/aarch64/librknnrt.so" /usr/lib/ +ADD --checksum=sha256:c48e11a6f41b451a5fd1e4ad774ea60252d3d94f78bee9b21ea3d21b21deba9a "https://github.com/airockchip/rknn-toolkit2/raw/refs/tags/${RKNN_TOOLKIT_VERSION}/rknpu2/runtime/Linux/librknn_api/include/rknn_api.h" /usr/include/ FROM prod-${DEVICE} AS prod @@ -167,6 +168,21 @@ COPY --from=builder /opt/venv /opt/venv COPY scripts/healthcheck.py . COPY immich_ml immich_ml +RUN if [ "$DEVICE" = "rknn" ]; then \ + apt-get update; \ + apt-get install -y g++ libc6-dev; \ + cd immich_ml/sessions/rknn/native; \ + RKNN_HEADER=/usr/include/rknn_api.h \ + RKNN_LIBRARY=/usr/lib/librknnrt.so \ + ./build-cross.sh; \ + apt-get purge -y --auto-remove \ + g++ \ + libc6-dev; \ + apt-get clean; \ + rm -rf /var/lib/apt/lists/*; \ + rm /usr/include/rknn_api.h; \ + fi + ARG BUILD_ID ARG BUILD_IMAGE ARG BUILD_SOURCE_REF diff --git a/machine-learning/immich_ml/sessions/rknn/__init__.py b/machine-learning/immich_ml/sessions/rknn/__init__.py index e388e4febc..fa17d9386e 100644 --- a/machine-learning/immich_ml/sessions/rknn/__init__.py +++ b/machine-learning/immich_ml/sessions/rknn/__init__.py @@ -1,76 +1,10 @@ -from __future__ import annotations +from .immich_session import RknnPoolExecutor, RknnSession, is_available, model_prefix, run_inference, soc_name -from pathlib import Path -from typing import Any, NamedTuple - -import numpy as np -from numpy.typing import NDArray - -from immich_ml.config import log, settings -from immich_ml.schemas import SessionNode - -from .rknnpool import RknnPoolExecutor, is_available, soc_name - -is_available = is_available and settings.rknn -model_prefix = Path("rknpu") / soc_name if is_available and soc_name is not None else None - - -def run_inference(rknn_lite: Any, input: list[NDArray[np.float32]]) -> list[NDArray[np.float32]]: - outputs: list[NDArray[np.float32]] = rknn_lite.inference(inputs=input, data_format="nchw") - return outputs - - -input_output_mapping: dict[str, dict[str, Any]] = { - "detection": { - "input": {"norm_tensor:0": (1, 3, 640, 640)}, - "output": { - "norm_tensor:1": (12800, 1), - "norm_tensor:2": (3200, 1), - "norm_tensor:3": (800, 1), - "norm_tensor:4": (12800, 4), - "norm_tensor:5": (3200, 4), - "norm_tensor:6": (800, 4), - "norm_tensor:7": (12800, 10), - "norm_tensor:8": (3200, 10), - "norm_tensor:9": (800, 10), - }, - }, - "recognition": {"input": {"norm_tensor:0": (1, 3, 112, 112)}, "output": {"norm_tensor:1": (1, 512)}}, -} - - -class RknnSession: - def __init__(self, model_path: Path) -> None: - self.model_type = "detection" if "detection" in model_path.parts else "recognition" - self.tpe = settings.rknn_threads - - log.info(f"Loading RKNN model from {model_path} with {self.tpe} threads.") - self.rknnpool = RknnPoolExecutor(model_path=model_path.as_posix(), tpes=self.tpe, func=run_inference) - log.info(f"Loaded RKNN model from {model_path} with {self.tpe} threads.") - - def get_inputs(self) -> list[SessionNode]: - return [RknnNode(name=k, shape=v) for k, v in input_output_mapping[self.model_type]["input"].items()] - - def get_outputs(self) -> list[SessionNode]: - return [RknnNode(name=k, shape=v) for k, v in input_output_mapping[self.model_type]["output"].items()] - - def run( - self, - output_names: list[str] | None, - input_feed: dict[str, NDArray[np.float32]] | dict[str, NDArray[np.int32]], - run_options: Any = None, - ) -> list[NDArray[np.float32]]: - input_data: list[NDArray[np.float32]] = [np.ascontiguousarray(v) for v in input_feed.values()] - self.rknnpool.put(input_data) - res = self.rknnpool.get() - if res is None: - raise RuntimeError("RKNN inference failed!") - return res - - -class RknnNode(NamedTuple): - name: str | None - shape: tuple[int, ...] - - -__all__ = ["RknnSession", "RknnNode", "is_available", "soc_name", "model_prefix"] +__all__ = [ + "RknnSession", + "RknnPoolExecutor", + "run_inference", + "is_available", + "soc_name", + "model_prefix", +] diff --git a/machine-learning/immich_ml/sessions/rknn/immich_session.py b/machine-learning/immich_ml/sessions/rknn/immich_session.py new file mode 100644 index 0000000000..f7246d1bd7 --- /dev/null +++ b/machine-learning/immich_ml/sessions/rknn/immich_session.py @@ -0,0 +1,266 @@ +from __future__ import annotations + +import threading +import time +from concurrent.futures import Future, ThreadPoolExecutor +from pathlib import Path +from types import TracebackType +from typing import TYPE_CHECKING, Any, NamedTuple, Optional, Protocol, Sequence, cast + +import numpy as np +from numpy.typing import NDArray + +from immich_ml.config import log, settings +from immich_ml.models.constants import RKNN_SUPPORTED_SOCS + +from .native import rknn_pool as _native_mod # pragma: no cover - compiled extension load + +if TYPE_CHECKING: + class NativeRKNNExecutor(Protocol): + def infer(self, inputs: list[NDArray[np.float32]]) -> list[NDArray[np.float32]]: ... + + def get_io_info(self) -> dict[str, Any]: ... +else: + NativeRKNNExecutor = _native_mod.NativeRKNNExecutor + + +__all__ = [ + "RknnSession", + "RknnPoolExecutor", + "run_inference", + "is_available", + "soc_name", + "model_prefix", +] + + +def get_soc(device_tree_path: Path | str) -> str | None: + try: + with Path(device_tree_path).open() as f: + device_compatible_str = f.read().lower() + for soc in RKNN_SUPPORTED_SOCS: + if soc in device_compatible_str: + return soc + except OSError as exc: + log.debug("Could not read device tree %s: %s", device_tree_path, exc) + return None + + +soc_name = get_soc("/proc/device-tree/compatible") +is_available = soc_name is not None and settings.rknn +model_prefix = Path("rknpu") / soc_name if is_available and soc_name else None + + +class SessionNode(NamedTuple): + name: Optional[str] + shape: tuple[int, ...] + + +class RKNNInferenceResult(NamedTuple): + tag: Any + start_time: float + end_time: float + duration_s: float + outputs: list[NDArray[np.float32]] + + +class InferenceExecutor(Protocol): + def infer(self, inputs: list[NDArray[np.float32]]) -> list[NDArray[np.float32]]: ... + + +def run_inference(executor: InferenceExecutor, inputs: list[NDArray[np.float32]]) -> list[NDArray[np.float32]]: + return executor.infer(inputs) + + +class RknnPoolExecutor: + def __init__(self, model_path: str | Path, tpes: int) -> None: + if tpes < 1: + raise ValueError("tpes must be >= 1") + model_path_str = Path(model_path).as_posix() + self._native = NativeRKNNExecutor(model_path_str, num_workers=tpes) + self._executor = ThreadPoolExecutor(max_workers=tpes, thread_name_prefix="rknn-worker") + self._closed = False + + def _run_inference(self, inputs: list[NDArray[np.float32]], tag: Any) -> RKNNInferenceResult: + start = time.perf_counter() + outputs = self._native.infer(inputs) + end = time.perf_counter() + return RKNNInferenceResult( + tag=tag, + start_time=start, + end_time=end, + duration_s=end - start, + outputs=outputs, + ) + + def submit(self, inputs: Sequence[NDArray[np.float32]], *, tag: Any = None) -> Future[RKNNInferenceResult]: + if self._closed: + raise RuntimeError("Pool is closed") + return self._executor.submit(self._run_inference, list(inputs), tag) + + def put(self, inputs: Sequence[NDArray[np.float32]], *, tag: Any = None) -> Future[RKNNInferenceResult]: + return self.submit(inputs, tag=tag) + + def close(self, *, wait: bool = True) -> None: + if self._closed: + return + self._closed = True + self._executor.shutdown(wait=wait) + + @property + def executor(self) -> NativeRKNNExecutor: + return self._native + + def __enter__(self) -> "RknnPoolExecutor": + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + self.close() + + +class RknnSession: + def __init__( + self, + model_path: Path | str, + *, + num_workers: Optional[int] = None, + logger: Any = None, + ) -> None: + if not is_available: + raise RuntimeError("RKNN is not available on this device") + self.model_path = Path(model_path) + self.log = logger or log + default_workers = getattr(settings, "rknn_threads", 1) + self.tpe = num_workers or default_workers + if self.tpe < 1: + raise ValueError("num_workers must be >= 1") + self.log.info( + "Loading RKNN model from %s with %s worker(s).", + self.model_path, + self.tpe, + ) + self.rknnpool = RknnPoolExecutor(self.model_path, self.tpe) + self._io_info = self._normalize_io_info(self.rknnpool.executor.get_io_info()) + self._input_nodes = self._build_nodes("inputs") + self._output_nodes = self._build_nodes("outputs") + self.log.info("Loaded RKNN model from %s.", self.model_path) + + @property + def io_info(self) -> dict[str, Any]: + return self._io_info + + def get_inputs(self) -> list[SessionNode]: + return self._input_nodes + + def get_outputs(self) -> list[SessionNode]: + return self._output_nodes + + def run( + self, + _output_names: Sequence[str] | None, + input_feed: dict[str, NDArray[np.float32]] | dict[str, NDArray[np.int32]], + _run_options: Any = None, + ) -> list[NDArray[np.float32]]: + return self.run_async(_output_names, input_feed, _run_options).result().outputs + + def run_async( + self, + _output_names: Sequence[str] | None, + input_feed: dict[str, NDArray[np.float32]] | dict[str, NDArray[np.int32]], + _run_options: Any = None, + ) -> Future[RKNNInferenceResult]: + inputs_list = list(input_feed.values()) + if not inputs_list: + raise ValueError("input_feed must not be empty") + + batch_sizes = {int(x.shape[0]) for x in inputs_list} + if len(batch_sizes) != 1: + raise ValueError(f"All inputs must have the same batch size, got {sorted(batch_sizes)}") + + batch_size = batch_sizes.pop() + if batch_size <= 1: + return self.rknnpool.put(inputs_list) + + # Split each input tensor into per-sample slices of shape (1, ...) + per_sample_inputs = [[inp[i : i + 1] for inp in inputs_list] for i in range(batch_size)] + sub_futures = [self.rknnpool.put(sample) for sample in per_sample_inputs] + + parent_future: Future[RKNNInferenceResult] = Future() + + def _aggregate() -> None: + try: + results = [f.result() for f in sub_futures] + num_outputs = len(results[0].outputs) + stacked_outputs = [np.concatenate([r.outputs[j] for r in results], axis=0) for j in range(num_outputs)] + start_time = min(r.start_time for r in results) + end_time = max(r.end_time for r in results) + parent_future.set_result( + RKNNInferenceResult( + tag=None, + start_time=start_time, + end_time=end_time, + duration_s=end_time - start_time, + outputs=stacked_outputs, + ), + ) + except Exception as exc: # noqa: BLE001 + if not parent_future.done(): + parent_future.set_exception(exc) + + threading.Thread(target=_aggregate, daemon=True).start() + return parent_future + + def close(self) -> None: + self.rknnpool.close() + + def _build_nodes(self, key: str) -> list[SessionNode]: + nodes: list[SessionNode] = [] + for entry in self._io_info.get(key, []): + shape = self._shape_from_entry(entry) + if key == "inputs" and shape: + # Represent the batch dimension symbolically for readability while + # keeping the static type compatible with the ModelSession protocol. + symbolic_shape_any: tuple[Any, ...] = ("batch", *shape[1:]) + symbolic_shape = cast(tuple[int, ...], symbolic_shape_any) + else: + symbolic_shape = shape + nodes.append(SessionNode(name=entry.get("name"), shape=symbolic_shape)) + return nodes + + @staticmethod + def _shape_from_entry(entry: dict[str, Any]) -> tuple[int, ...]: + if dims := entry.get("dims"): + return tuple(int(dim) for dim in dims) + dyn = entry.get("dynamic", {}) + ranges = dyn.get("ranges", []) + if ranges: + return tuple(int(dim) for dim in ranges[-1]) + raise ValueError(f"Cannot determine shape from entry: {entry}") + + def _normalize_io_info(self, info: dict[str, Any]) -> dict[str, Any]: + return { + **info, + "inputs": [self._normalize_tensor_desc(t) for t in info.get("inputs", [])], + "outputs": [self._normalize_tensor_desc(t) for t in info.get("outputs", [])], + } + + @staticmethod + def _normalize_tensor_desc(tensor: dict[str, Any]) -> dict[str, Any]: + dims = list(RknnSession._shape_from_entry(tensor)) + desc = {**tensor, "dims": dims, "n_dims": len(dims)} + # Force NCHW format if the runtime reports NHWC tensors + if tensor.get("fmt") == 1 and len(dims) == 4: + n, h, w, c = dims + desc["dims"] = [n, c, h, w] + desc["fmt"] = 0 + dyn = desc.get("dynamic", {}) + if "ranges" in dyn: + dyn["ranges"] = [ + [shape[0], shape[3], shape[1], shape[2]] if len(shape) == 4 else shape for shape in dyn["ranges"] + ] + return desc diff --git a/machine-learning/immich_ml/sessions/rknn/inference_test.py b/machine-learning/immich_ml/sessions/rknn/inference_test.py new file mode 100644 index 0000000000..fc92b8a4da --- /dev/null +++ b/machine-learning/immich_ml/sessions/rknn/inference_test.py @@ -0,0 +1,138 @@ +import argparse +import time +from pathlib import Path +from typing import List + +import numpy as np +import numpy.typing as npt + +try: + from .immich_session import RknnSession +except ImportError: + from rknn_multi_executor.immich_session import RknnSession # type: ignore + + +def parse_shape(shape_str: str) -> List[int]: + parts = [p.strip() for p in shape_str.split(",") if p.strip()] + if not parts: + raise ValueError("Invalid shape string") + return [int(p) for p in parts] + + +def main() -> None: + parser = argparse.ArgumentParser(description="Minimal RKNN Native Executor usage") + parser.add_argument("--model", required=True, type=Path, help="Path to .rknn model") + parser.add_argument("--num-workers", type=int, default=3, help="Number of worker contexts") + parser.add_argument( + "--shape", + type=str, + default="1,3,640,640", + help="Input shape as comma-separated list, e.g. 1,3,640,640", + ) + parser.add_argument( + "--dtype", + type=str, + default="float32", + choices=["float32", "float16", "int32", "int8", "uint8"], + help="Data type for randomly generated input tensor", + ) + args = parser.parse_args() + + shape = parse_shape(args.shape) + if len(shape) < 2: + raise ValueError("Shape must have at least 2 dims (e.g., NCHW)") + + gen_t0 = time.perf_counter() + # Generate a random input tensor with the requested dtype + x: npt.NDArray[np.generic] + if args.dtype == "float32": + x = np.random.rand(*shape).astype(np.float32) + elif args.dtype == "float16": + x = np.random.rand(*shape).astype(np.float16) + elif args.dtype == "int32": + # Use a modest integer range; adjust as needed for your model (e.g., vocab size) + x = np.random.randint(0, 1000, size=tuple(shape), dtype=np.int32) + elif args.dtype == "int8": + x = np.random.randint(-128, 128, size=tuple(shape), dtype=np.int8) + elif args.dtype == "uint8": + x = np.random.randint(0, 256, size=tuple(shape), dtype=np.uint8) + else: + raise ValueError(f"Unsupported dtype: {args.dtype}") + gen_t1 = time.perf_counter() + print(f"[timing] generated random {args.dtype} tensor with shape {shape} " f"in {(gen_t1-gen_t0)*1000:.2f} ms") + + time.sleep(1) + session_t0 = time.perf_counter() + session = RknnSession(args.model.as_posix(), num_workers=args.num_workers) + session_t1 = time.perf_counter() + print(session.get_inputs()) + print(f"[timing] session init took {(session_t1-session_t0)*1000:.2f} ms") + try: + print("IO description:", session.io_info) + inputs = session.get_inputs() + if not inputs: + raise RuntimeError("Model exposes no inputs") + input_name = inputs[0].name or "input" + + # Serial demo + for i in range(3): + t0 = time.perf_counter() + outs = session.run(None, {input_name: x}) + t1 = time.perf_counter() + print( + f"[serial {i+1}] start={t0:.6f}s end={t1:.6f}s " + f"dur_ms={(t1-t0)*1000:.2f} shapes={[getattr(o, 'shape', None) for o in outs]}" + ) + + # Batch demo (single RKNN session call with batched input to exercise batch mode) + batch_repeats = 3 + if shape[0] == 1: + batch_shape = [batch_repeats, *shape[1:]] + else: + batch_shape = shape + + x_batch: npt.NDArray[np.generic] + if args.dtype == "float32": + x_batch = np.random.rand(*batch_shape).astype(np.float32) + elif args.dtype == "float16": + x_batch = np.random.rand(*batch_shape).astype(np.float16) + elif args.dtype == "int32": + x_batch = np.random.randint(0, 1000, size=tuple(batch_shape), dtype=np.int32) + elif args.dtype == "int8": + x_batch = np.random.randint(-128, 128, size=tuple(batch_shape), dtype=np.int8) + elif args.dtype == "uint8": + x_batch = np.random.randint(0, 256, size=tuple(batch_shape), dtype=np.uint8) + else: + raise ValueError(f"Unsupported dtype: {args.dtype}") + + for i in range(3): + t0 = time.perf_counter() + outs = session.run(None, {input_name: x_batch}) + t1 = time.perf_counter() + print( + f"[batch {i+1}] start={t0:.6f}s end={t1:.6f}s " + f"dur_ms={(t1-t0)*1000:.2f} shapes={[getattr(o, 'shape', None) for o in outs]}" + ) + + time.sleep(1) + # Parallel demo using Immich-style pool + total_requests = 5 * args.num_workers + print(f"[pool] submitting {total_requests} requests with {args.num_workers} worker contexts") + batch_t0 = time.perf_counter() + futures = [] + for _ in range(total_requests): + futures.append(session.rknnpool.put([x])) + for idx, fut in enumerate(futures): + res = fut.result() + outs = res.outputs + lat_ms = res.duration_s * 1000.0 + print(f"[parallel {idx+1}] dur_ms={lat_ms:.2f} shapes={[getattr(o, 'shape', None) for o in outs]}") + batch_t1 = time.perf_counter() + print(f"[parallel batch] total_ms={(batch_t1-batch_t0)*1000:.2f}") + time.sleep(1) + finally: + session.close() + + +if __name__ == "__main__": + main() diff --git a/machine-learning/immich_ml/sessions/rknn/native/__init__.py b/machine-learning/immich_ml/sessions/rknn/native/__init__.py new file mode 100644 index 0000000000..51c4d71ca9 --- /dev/null +++ b/machine-learning/immich_ml/sessions/rknn/native/__init__.py @@ -0,0 +1,7 @@ +from __future__ import annotations + +from importlib import import_module + +__all__ = ["rknn_pool"] + +rknn_pool = import_module(f"{__name__}.rknn_pool") diff --git a/machine-learning/immich_ml/sessions/rknn/native/build-cross.sh b/machine-learning/immich_ml/sessions/rknn/native/build-cross.sh new file mode 100755 index 0000000000..e90dd06d69 --- /dev/null +++ b/machine-learning/immich_ml/sessions/rknn/native/build-cross.sh @@ -0,0 +1,64 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +SRC_FILE="${SCRIPT_DIR}/rknn_pool.cpp" +CXX="${CXX:-g++}" + +RKNN_HEADER="${RKNN_HEADER:-/usr/include/rknn_api.h}" +RKNN_LIBRARY="${RKNN_LIBRARY:-/usr/lib/librknnrt.so}" +RKNN_OUTPUT_DIR="${RKNN_OUTPUT_DIR:-$SCRIPT_DIR}" + +if [[ $# -ge 1 ]]; then + RKNN_HEADER="$1" +fi +if [[ $# -ge 2 ]]; then + RKNN_LIBRARY="$2" +fi +if [[ $# -ge 3 ]]; then + RKNN_OUTPUT_DIR="$3" +fi + +for file in "$SRC_FILE" "$RKNN_HEADER" "$RKNN_LIBRARY"; do + if [[ ! -f "$file" ]]; then + echo "Missing required file: $file" >&2 + exit 1 + fi +done + +if ! command -v python3 >/dev/null 2>&1; then + echo "python3 is required to determine include paths." >&2 + exit 1 +fi + +read -r -a PYBIND_FLAGS <<<"$(python3 -m pybind11 --includes)" +EXT_SUFFIX="$(python3 - <<'PY' +import sysconfig +print(sysconfig.get_config_var("EXT_SUFFIX") or ".so") +PY +)" + +INCLUDE_DIR="$(dirname "$(realpath "$RKNN_HEADER")")" +LIB_DIR="$(dirname "$(realpath "$RKNN_LIBRARY")")" +LIB_BASE="$(basename "$RKNN_LIBRARY")" +LIB_NAME="${LIB_BASE#lib}" +LIB_NAME="${LIB_NAME%%.so*}" + +mkdir -p "$RKNN_OUTPUT_DIR" +OUTPUT_PATH="${RKNN_OUTPUT_DIR}/rknn_pool${EXT_SUFFIX}" + +echo "[build-cross] Building ${OUTPUT_PATH}" + +"$CXX" "$SRC_FILE" \ + -shared -o "$OUTPUT_PATH" \ + -O3 -DNDEBUG -std=c++17 -fPIC \ + -Wall -Wextra -Wno-unused-parameter \ + -D_DEFAULT_SOURCE -D_GNU_SOURCE \ + -Wl,-z,relro,-z,now \ + -Wl,-rpath,"\$ORIGIN" -Wl,-rpath,"\$ORIGIN/." \ + "${PYBIND_FLAGS[@]}" \ + -I "$INCLUDE_DIR" \ + -L"$LIB_DIR" -l"$LIB_NAME" \ + -ldl -lpthread + +echo "[build-cross] Done." diff --git a/machine-learning/immich_ml/sessions/rknn/native/rknn_pool.cpp b/machine-learning/immich_ml/sessions/rknn/native/rknn_pool.cpp new file mode 100644 index 0000000000..fe745720ce --- /dev/null +++ b/machine-learning/immich_ml/sessions/rknn/native/rknn_pool.cpp @@ -0,0 +1,414 @@ +// RKNNLite-like minimal wrapper with correct input shape/type handling and dup_context sharing. + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rknn_api.h" + +namespace py = pybind11; +using namespace py::literals; + +// Helpers +static bool is_dims_known(const rknn_tensor_attr& a) { + for (uint32_t d = 0; d < a.n_dims; ++d) { + if (a.dims[d] == 0) return false; + } + return true; +} + +static py::array ensure_dtype(py::array arr, rknn_tensor_type t) { + py::dtype target_dtype; + switch (t) { + case RKNN_TENSOR_FLOAT16: + target_dtype = py::dtype("float16"); + break; + case RKNN_TENSOR_FLOAT32: + target_dtype = py::dtype::of(); + break; + case RKNN_TENSOR_UINT8: + target_dtype = py::dtype::of(); + break; + case RKNN_TENSOR_INT8: + target_dtype = py::dtype::of(); + break; + default: + return arr; + } + if (!arr.dtype().is(target_dtype)) { + return py::array::ensure(arr.attr("astype")(target_dtype), py::array::c_style); + } + return arr; +} + +struct __attribute__((visibility("hidden"))) PreparedInput { + rknn_input tensor{}; + std::vector shape; + py::array buffer; +}; + +static py::list dims_to_list(const uint32_t* dims, uint32_t n_dims) { + py::list lst(n_dims); + for (uint32_t k = 0; k < n_dims; ++k) lst[k] = dims[k]; + return lst; +} + +static py::dict make_tensor_info(uint32_t index, const rknn_tensor_attr& attr) { + return py::dict("index"_a=index, "name"_a=py::str(attr.name), "fmt"_a=static_cast(attr.fmt), + "type"_a=static_cast(attr.type), "n_dims"_a=attr.n_dims, + "dims"_a=dims_to_list(attr.dims, attr.n_dims)); +} + +static py::dict make_dynamic_dict(const rknn_input_range& rng) { + py::list ranges(rng.shape_number); + for (uint32_t s = 0; s < rng.shape_number; ++s) { + py::list single(rng.n_dims); + for (uint32_t d = 0; d < rng.n_dims; ++d) { + single[d] = rng.dyn_range[s][d]; + } + ranges[s] = single; + } + return py::dict("shape_number"_a=rng.shape_number, "n_dims"_a=rng.n_dims, + "fmt"_a=static_cast(rng.fmt), "name"_a=py::str(rng.name), "ranges"_a=ranges); +} + +static py::array align_layout(py::array arr, const rknn_tensor_attr& attr) { + if (attr.n_dims != 4 || !is_dims_known(attr)) return arr; + py::buffer_info bi = arr.request(); + if (bi.ndim != 4) return arr; + + auto matches = [&](uint32_t d1, uint32_t d2, uint32_t d3) -> bool { + return bi.shape[1] == d1 && bi.shape[2] == d2 && bi.shape[3] == d3; + }; + + if (attr.fmt == RKNN_TENSOR_NHWC) { + if (matches(attr.dims[1], attr.dims[2], attr.dims[3])) return arr; + if (matches(attr.dims[3], attr.dims[1], attr.dims[2])) { + return py::array::ensure(arr.attr("transpose")(py::make_tuple(0, 2, 3, 1)), py::array::c_style); + } + } else if (attr.fmt == RKNN_TENSOR_NCHW) { + if (matches(attr.dims[1], attr.dims[2], attr.dims[3])) return arr; + if (matches(attr.dims[2], attr.dims[3], attr.dims[1])) { + return py::array::ensure(arr.attr("transpose")(py::make_tuple(0, 3, 1, 2)), py::array::c_style); + } + } + return arr; +} + +static PreparedInput prepare_input_tensor(py::handle handle, const rknn_tensor_attr& attr, bool capture_shape) { + py::array arr = handle.cast(); + py::array contiguous = py::array::ensure(arr, py::array::c_style); + contiguous = align_layout(contiguous, attr); + contiguous = ensure_dtype(contiguous, attr.type); + auto bi = contiguous.request(); + + rknn_input tensor{}; + tensor.index = attr.index; + tensor.type = attr.type; + tensor.fmt = attr.fmt; + tensor.size = static_cast(contiguous.nbytes()); + tensor.buf = const_cast(bi.ptr); + + std::vector shape; + if (capture_shape) { + shape.resize(bi.ndim); + for (ssize_t d = 0; d < bi.ndim; ++d) { + shape[d] = static_cast(bi.shape[d]); + } + } + return {tensor, std::move(shape), contiguous}; +} + +static int find_matching_shape(const rknn_input_range& rng, const std::vector& provided) { + if (provided.size() != rng.n_dims) return rng.shape_number - 1; + for (uint32_t s = 0; s < rng.shape_number; ++s) { + bool match = true; + for (uint32_t d = 0; d < rng.n_dims && match; ++d) + match = (rng.dyn_range[s][d] == provided[d]); + if (match) return s; + } + return rng.shape_number - 1; +} + +static py::array make_output_array(const rknn_tensor_attr& attr, const rknn_output& out) { + std::vector shape(attr.n_dims == 0 ? 1 : attr.n_dims); + if (attr.n_dims == 0) { + shape[0] = static_cast(out.size / sizeof(float)); + } else { + std::copy(attr.dims, attr.dims + attr.n_dims, shape.begin()); + } + py::array arr(py::dtype::of(), shape); + std::memcpy(arr.mutable_data(), out.buf, out.size); + return arr; +} + + +struct RknnCtx { + rknn_context ctx = 0; + rknn_input_output_num io_num{}; + std::vector input_attrs; + std::vector output_attrs; + + void query_io() { + if (rknn_query(ctx, RKNN_QUERY_IN_OUT_NUM, &io_num, sizeof(io_num)) != RKNN_SUCC) + throw std::runtime_error("rknn_query IN_OUT_NUM failed"); + input_attrs.resize(io_num.n_input); + output_attrs.resize(io_num.n_output); + for (uint32_t i = 0; i < io_num.n_input; ++i) { + rknn_tensor_attr attr{}; + attr.index = i; + if (rknn_query(ctx, RKNN_QUERY_INPUT_ATTR, &attr, sizeof(attr)) != RKNN_SUCC) + throw std::runtime_error("rknn_query INPUT_ATTR failed"); + input_attrs[i] = attr; + } + for (uint32_t i = 0; i < io_num.n_output; ++i) { + rknn_tensor_attr attr{}; + attr.index = i; + if (rknn_query(ctx, RKNN_QUERY_OUTPUT_ATTR, &attr, sizeof(attr)) != RKNN_SUCC) + throw std::runtime_error("rknn_query OUTPUT_ATTR failed"); + output_attrs[i] = attr; + } + } +}; + +static void debug_print_io_info(const RknnCtx& ctx) { + std::cerr << "[rknn2] model: inputs=" << ctx.io_num.n_input + << " outputs=" << ctx.io_num.n_output << std::endl; + std::cerr << "input tensors:" << std::endl; + for (uint32_t i = 0; i < ctx.io_num.n_input; ++i) { + const auto& a = ctx.input_attrs[i]; + std::cerr << " in[" << i << "] fmt=" << a.fmt << " dims=["; + for (uint32_t d = 0; d < a.n_dims; ++d) { + std::cerr << a.dims[d] << (d + 1 < a.n_dims ? "," : ""); + } + std::cerr << "]" << std::endl; + } + std::cerr << "output tensors:" << std::endl; + for (uint32_t i = 0; i < ctx.io_num.n_output; ++i) { + const auto& a = ctx.output_attrs[i]; + std::cerr << " out[" << i << "] fmt=" << a.fmt << " dims=["; + for (uint32_t d = 0; d < a.n_dims; ++d) { + std::cerr << a.dims[d] << (d + 1 < a.n_dims ? "," : ""); + } + std::cerr << "]" << std::endl; + } +} + +static const rknn_tensor_attr& resolve_output_attr(bool is_dynamic, RknnCtx& ctx, uint32_t index, rknn_tensor_attr& scratch) { + if (!is_dynamic) return ctx.output_attrs[index]; + scratch.index = index; + if (rknn_query(ctx.ctx, RKNN_QUERY_CURRENT_OUTPUT_ATTR, &scratch, sizeof(scratch)) == RKNN_SUCC) { + return scratch; + } + return ctx.output_attrs[index]; +} + +class NativeRKNNExecutor { +public: + explicit NativeRKNNExecutor(const std::string& model_path, int num_workers) + : rr_index_(0), + is_dynamic_model_(false) { + if (num_workers < 1) throw std::invalid_argument("num_workers must be >= 1"); + if (num_workers > 3) throw std::invalid_argument("num_workers must be <= 3"); + const bool debug_ctor = (std::getenv("RKNN_EXEC_DEBUG") != nullptr); + + RknnCtx master; + if (rknn_init(&master.ctx, const_cast(model_path.c_str()), 0, 0, nullptr) != RKNN_SUCC) + throw std::runtime_error("rknn_init failed"); + master.query_io(); + if (debug_ctor) debug_print_io_info(master); + + input_ranges_.resize(master.io_num.n_input); + for (uint32_t i = 0; i < master.io_num.n_input; ++i) { + rknn_input_range rng{}; + rng.index = i; + if (rknn_query(master.ctx, RKNN_QUERY_INPUT_DYNAMIC_RANGE, &rng, sizeof(rng)) == RKNN_SUCC + && rng.shape_number > 0 && rng.n_dims > 0) { + is_dynamic_model_ = true; + input_ranges_[i] = rng; + } + } + contexts_.push_back(std::move(master)); + + for (int i = 1; i < num_workers; ++i) { + RknnCtx child; + if (rknn_dup_context(&contexts_[0].ctx, &child.ctx) != RKNN_SUCC) + throw std::runtime_error("rknn_dup_context failed"); + child.query_io(); + contexts_.push_back(std::move(child)); + } + ctx_busy_.assign(contexts_.size(), false); + } + + ~NativeRKNNExecutor() { + for (auto& c : contexts_) { + if (c.ctx) rknn_destroy(c.ctx); + } + } + + py::dict get_io_info() const { + py::dict info; + const RknnCtx& master = contexts_.front(); + info["is_dynamic"] = is_dynamic_model_; + py::list inputs(master.io_num.n_input); + for (uint32_t i = 0; i < master.io_num.n_input; ++i) { + py::dict desc = make_tensor_info(i, master.input_attrs[i]); + if (is_dynamic_model_ && i < input_ranges_.size()) { + const auto& rng = input_ranges_[i]; + if (rng.shape_number > 0 && rng.n_dims > 0) { + desc["dynamic"] = make_dynamic_dict(rng); + } + } + inputs[i] = desc; + } + info["inputs"] = inputs; + py::list outputs(master.io_num.n_output); + for (uint32_t i = 0; i < master.io_num.n_output; ++i) { + outputs[i] = make_tensor_info(i, master.output_attrs[i]); + } + info["outputs"] = outputs; + return info; + } + + py::list infer(const py::list& inputs) { + auto ctx_handle = acquire_ctx_(); + auto& c = ctx_handle.ctx; + if (inputs.size() != c.io_num.n_input) + throw std::runtime_error("Input count mismatch"); + + std::vector in(c.io_num.n_input); + std::vector keep_alive; + keep_alive.reserve(c.io_num.n_input); + std::vector> input_shapes; + if (is_dynamic_model_) input_shapes.reserve(c.io_num.n_input); + + for (uint32_t i = 0; i < c.io_num.n_input; ++i) { + PreparedInput prepared = prepare_input_tensor(inputs[i], c.input_attrs[i], is_dynamic_model_); + in[i] = prepared.tensor; + if (is_dynamic_model_) input_shapes.push_back(std::move(prepared.shape)); + keep_alive.push_back(std::move(prepared.buffer)); + } + if (is_dynamic_model_) set_dynamic_shapes(c, input_shapes); + + std::vector out(c.io_num.n_output); + { + py::gil_scoped_release nogil; + if (rknn_inputs_set(c.ctx, c.io_num.n_input, in.data()) != RKNN_SUCC) + throw std::runtime_error("rknn_inputs_set failed"); + if (rknn_run(c.ctx, nullptr) != RKNN_SUCC) + throw std::runtime_error("rknn_run failed"); + for (uint32_t i = 0; i < c.io_num.n_output; ++i) { + out[i] = {}; + out[i].want_float = 1; + out[i].index = i; + } + if (rknn_outputs_get(c.ctx, c.io_num.n_output, out.data(), nullptr) != RKNN_SUCC) + throw std::runtime_error("rknn_outputs_get failed"); + } + + py::list result(c.io_num.n_output); + rknn_tensor_attr scratch{}; + for (uint32_t i = 0; i < c.io_num.n_output; ++i) { + const auto& attr = resolve_output_attr(is_dynamic_model_, c, i, scratch); + result[i] = make_output_array(attr, out[i]); + } + { + py::gil_scoped_release nogil; + rknn_outputs_release(c.ctx, c.io_num.n_output, out.data()); + } + return result; + } + +private: + struct CtxHandle { + RknnCtx& ctx; + NativeRKNNExecutor& owner; + size_t index; + CtxHandle(RknnCtx& ctx_ref, NativeRKNNExecutor& parent, size_t idx) + : ctx(ctx_ref), owner(parent), index(idx) {} + ~CtxHandle() { + owner.release_ctx_(index); + } + }; + + CtxHandle acquire_ctx_() { + std::unique_lock lock(ctx_mutex_); + ctx_cv_.wait(lock, [&]{ + for (bool busy : ctx_busy_) { + if (!busy) return true; + } + return false; + }); + size_t start = rr_index_; + for (size_t attempt = 0; attempt < ctx_busy_.size(); ++attempt) { + size_t idx = (start + attempt) % ctx_busy_.size(); + if (!ctx_busy_[idx]) { + ctx_busy_[idx] = true; + rr_index_ = (idx + 1) % ctx_busy_.size(); + return CtxHandle(contexts_[idx], *this, idx); + } + } + throw std::runtime_error("No RKNN context available"); + } + + void release_ctx_(size_t idx) { + std::lock_guard lock(ctx_mutex_); + ctx_busy_[idx] = false; + ctx_cv_.notify_one(); + } + void set_dynamic_shapes(RknnCtx& ctx, const std::vector>& input_shapes) const; + +private: + std::mutex ctx_mutex_; + std::condition_variable ctx_cv_; + size_t rr_index_; + std::vector contexts_; + std::vector ctx_busy_; + bool is_dynamic_model_; + std::vector input_ranges_; +}; + +void NativeRKNNExecutor::set_dynamic_shapes(RknnCtx& ctx, const std::vector>& input_shapes) const { + std::vector attrs = ctx.input_attrs; + for (uint32_t i = 0; i < ctx.io_num.n_input && i < input_ranges_.size() && i < input_shapes.size(); ++i) { + const auto& rng = input_ranges_[i]; + if (rng.shape_number == 0 || rng.n_dims == 0) continue; + int match_idx = find_matching_shape(rng, input_shapes[i]); + auto& attr = attrs[i]; + attr.n_dims = rng.n_dims; + for (uint32_t d = 0; d < rng.n_dims && d < RKNN_MAX_DIMS; ++d) { + attr.dims[d] = rng.dyn_range[match_idx][d]; + } + if (rng.fmt == RKNN_TENSOR_NHWC || rng.fmt == RKNN_TENSOR_NCHW || rng.fmt == RKNN_TENSOR_UNDEFINED) + attr.fmt = rng.fmt; + attr.type = ctx.input_attrs[i].type; + } + if (rknn_set_input_shapes(ctx.ctx, ctx.io_num.n_input, attrs.data()) != RKNN_SUCC) + throw std::runtime_error("Failed to set input shapes for dynamic model"); +} + +PYBIND11_MODULE(rknn_pool, m) { + py::class_(m, "NativeRKNNExecutor") + .def(py::init(), py::arg("model_path"), py::arg("num_workers") = 1) + .def("set_core_mask_all", + [](NativeRKNNExecutor& /*self*/, int /*mask*/) { + // Placeholder for API compatibility; no-op. + }, + py::arg("mask"), + "Set the NPU core mask for all contexts (no-op placeholder).") + .def("infer", &NativeRKNNExecutor::infer, py::arg("inputs"), + "Run inference with a list of numpy arrays, returns list of numpy arrays.") + .def("get_io_info", &NativeRKNNExecutor::get_io_info, + "Return a dict with model IO info, including dynamic input ranges when available."); +} + + diff --git a/machine-learning/immich_ml/sessions/rknn/rknnpool.py b/machine-learning/immich_ml/sessions/rknn/rknnpool.py deleted file mode 100644 index fd0af8bcc4..0000000000 --- a/machine-learning/immich_ml/sessions/rknn/rknnpool.py +++ /dev/null @@ -1,91 +0,0 @@ -# This code is from leafqycc/rknn-multi-threaded -# Following Apache License 2.0 - -import logging -from concurrent.futures import Future, ThreadPoolExecutor -from pathlib import Path -from queue import Queue -from typing import Callable - -import numpy as np -from numpy.typing import NDArray - -from immich_ml.config import log -from immich_ml.models.constants import RKNN_COREMASK_SUPPORTED_SOCS, RKNN_SUPPORTED_SOCS - - -def get_soc(device_tree_path: Path | str) -> str | None: - try: - with Path(device_tree_path).open() as f: - device_compatible_str = f.read() - for soc in RKNN_SUPPORTED_SOCS: - if soc in device_compatible_str: - return soc - log.warning("Device is not supported for RKNN") - except OSError as e: - log.warning(f"Could not read {device_tree_path}. Reason: %s", e) - return None - - -soc_name = None -is_available = False -try: - from rknnlite.api import RKNNLite - - soc_name = get_soc("/proc/device-tree/compatible") - is_available = soc_name is not None -except ImportError: - log.debug("RKNN is not available") - - -def init_rknn(model_path: str) -> "RKNNLite": - if not is_available: - raise RuntimeError("rknn is not available!") - rknn_lite = RKNNLite() - rknn_lite.rknn_log.logger.setLevel(logging.ERROR) - ret = rknn_lite.load_rknn(model_path) - if ret != 0: - raise RuntimeError("Failed to load RKNN model") - - if soc_name in RKNN_COREMASK_SUPPORTED_SOCS: - ret = rknn_lite.init_runtime(core_mask=RKNNLite.NPU_CORE_AUTO) - else: - ret = rknn_lite.init_runtime() # Please do not set this parameter on other platforms. - - if ret != 0: - raise RuntimeError("Failed to initialize RKNN runtime environment") - - return rknn_lite - - -class RknnPoolExecutor: - def __init__( - self, - model_path: str, - tpes: int, - func: Callable[["RKNNLite", list[NDArray[np.float32]]], list[NDArray[np.float32]]], - ) -> None: - self.tpes = tpes - self.queue: Queue[Future[list[NDArray[np.float32]]]] = Queue() - self.rknn_pool = [init_rknn(model_path) for _ in range(tpes)] - self.pool = ThreadPoolExecutor(max_workers=tpes) - self.func = func - self.num = 0 - - def put(self, inputs: list[NDArray[np.float32]]) -> None: - self.queue.put(self.pool.submit(self.func, self.rknn_pool[self.num % self.tpes], inputs)) - self.num += 1 - - def get(self) -> list[NDArray[np.float32]] | None: - if self.queue.empty(): - return None - fut = self.queue.get() - return fut.result() - - def release(self) -> None: - self.pool.shutdown() - for rknn_lite in self.rknn_pool: - rknn_lite.release() - - def __del__(self) -> None: - self.release() diff --git a/machine-learning/pyproject.toml b/machine-learning/pyproject.toml index 25b936bc36..5145345614 100644 --- a/machine-learning/pyproject.toml +++ b/machine-learning/pyproject.toml @@ -53,7 +53,10 @@ cpu = ["onnxruntime>=1.15.0,<2"] cuda = ["onnxruntime-gpu>=1.17.0,<2"] openvino = ["onnxruntime-openvino>=1.17.1,<1.19.0"] armnn = ["onnxruntime>=1.15.0,<2"] -rknn = ["onnxruntime>=1.15.0,<2", "rknn-toolkit-lite2>=2.3.0,<3"] +rknn = [ + "onnxruntime>=1.15.0,<2", + "pybind11>=2.11.0,<4.0", +] rocm = [] [tool.uv]