From fa65ec214d07a401814abcfa95db42a948247747 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=93=9D=E5=BE=90=E7=9D=BF?= Date: Tue, 16 Dec 2025 11:42:21 +0800 Subject: [PATCH 1/2] feat(ml): add padded fallback for face detection Add an optional fallback pass that pads the input image and re-runs detection to handle large/edge faces.\n\n- Expose fallbackPadRatio via model kwargs and configure().\n- Return empty outputs when no boxes/landmarks are produced. --- .../models/facial_recognition/detection.py | 57 +++++++++++++++++-- 1 file changed, 52 insertions(+), 5 deletions(-) diff --git a/machine-learning/immich_ml/models/facial_recognition/detection.py b/machine-learning/immich_ml/models/facial_recognition/detection.py index 26c9208e48..6fa86c2e6b 100644 --- a/machine-learning/immich_ml/models/facial_recognition/detection.py +++ b/machine-learning/immich_ml/models/facial_recognition/detection.py @@ -1,5 +1,6 @@ from typing import Any +import cv2 import numpy as np from insightface.model_zoo import RetinaFace from numpy.typing import NDArray @@ -15,6 +16,9 @@ class FaceDetector(InferenceModel): def __init__(self, model_name: str, min_score: float = 0.7, **model_kwargs: Any) -> None: self.min_score = model_kwargs.pop("minScore", min_score) + # 对“大脸/贴边脸”场景做兜底:当首轮检测无结果时,对图像做 padding 再检测。 + # 这不依赖模型是否支持动态 det_size(即使 ONNX 是固定 640x640 也有效)。 + self.fallback_pad_ratio = float(model_kwargs.pop("fallbackPadRatio", 0.25)) super().__init__(model_name, **model_kwargs) def _load(self) -> ModelSession: @@ -25,17 +29,60 @@ class FaceDetector(InferenceModel): return session def _predict(self, inputs: NDArray[np.uint8] | bytes) -> FaceDetectionOutput: - inputs = decode_cv2(inputs) - - bboxes, landmarks = self._detect(inputs) + img = decode_cv2(inputs) + bboxes, landmarks = self._detect(img) + if landmarks is None or bboxes.shape[0] == 0: + # 人脸识别依赖关键点进行对齐裁剪;无关键点时返回空结果比输出错误 embedding 更安全。 + return { + "boxes": np.zeros((0, 4), dtype=np.float32), + "scores": np.zeros((0,), dtype=np.float32), + "landmarks": np.zeros((0, 5, 2), dtype=np.float32), + } return { "boxes": bboxes[:, :4].round(), "scores": bboxes[:, 4], "landmarks": landmarks, } - def _detect(self, inputs: NDArray[np.uint8] | bytes) -> tuple[NDArray[np.float32], NDArray[np.float32]]: - return self.model.detect(inputs) # type: ignore + def _detect(self, img: NDArray[np.uint8]) -> tuple[NDArray[np.float32], NDArray[np.float32] | None]: + bboxes, landmarks = self.model.detect(img) # type: ignore + if bboxes.shape[0] > 0: + return bboxes, landmarks + + # 首轮无检测结果时,尝试对图像加边框后再检测,以降低“大脸”相对尺度。 + if self.fallback_pad_ratio <= 0: + return bboxes, landmarks + + padded, (pad_top, pad_left) = self._pad_image(img, self.fallback_pad_ratio) + bboxes2, landmarks2 = self.model.detect(padded) # type: ignore + if bboxes2.shape[0] == 0: + return bboxes, landmarks + + # 将 padded 坐标系映射回原图坐标系。 + bboxes2 = bboxes2.copy() + bboxes2[:, 0] -= pad_left + bboxes2[:, 2] -= pad_left + bboxes2[:, 1] -= pad_top + bboxes2[:, 3] -= pad_top + + if landmarks2 is not None: + landmarks2 = landmarks2.copy() + landmarks2[:, :, 0] -= pad_left + landmarks2[:, :, 1] -= pad_top + + return bboxes2, landmarks2 + + @staticmethod + def _pad_image(img: NDArray[np.uint8], pad_ratio: float) -> tuple[NDArray[np.uint8], tuple[int, int]]: + """给图像四周增加 padding,用于改善超大脸/贴边脸的检测稳定性。""" + h, w = img.shape[:2] + pad = int(round(max(h, w) * pad_ratio)) + pad = max(pad, 1) + # 使用反射边界减少纯黑 padding 对检测的干扰。 + padded = cv2.copyMakeBorder(img, pad, pad, pad, pad, borderType=cv2.BORDER_REFLECT_101) + return np.asarray(padded, dtype=np.uint8), (pad, pad) def configure(self, **kwargs: Any) -> None: self.model.det_thresh = kwargs.pop("minScore", self.model.det_thresh) + if (pad_ratio := kwargs.pop("fallbackPadRatio", None)) is not None: + self.fallback_pad_ratio = float(pad_ratio) From 27e235a015e387bec838be5c968408f2f4266cd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=93=9D=E5=BE=90=E7=9D=BF?= Date: Tue, 16 Dec 2025 11:42:27 +0800 Subject: [PATCH 2/2] feat(ml): pad crops for robust face alignment Pad the full image (reflect border) when landmarks are near/outside the image bounds, so norm_crop() stays stable for large/edge faces.\n\n- Add cropMarginRatio/maxCropPadRatio knobs via kwargs and configure().\n- Ensure returned embeddings are float32. --- .../models/facial_recognition/recognition.py | 59 ++++++++++++++++++- 1 file changed, 56 insertions(+), 3 deletions(-) diff --git a/machine-learning/immich_ml/models/facial_recognition/recognition.py b/machine-learning/immich_ml/models/facial_recognition/recognition.py index 759992a600..2f3e905d10 100644 --- a/machine-learning/immich_ml/models/facial_recognition/recognition.py +++ b/machine-learning/immich_ml/models/facial_recognition/recognition.py @@ -1,6 +1,7 @@ from pathlib import Path from typing import Any +import cv2 import numpy as np import onnx import onnxruntime as ort @@ -28,6 +29,10 @@ class FaceRecognizer(InferenceModel): identity = (ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION) def __init__(self, model_name: str, **model_kwargs: Any) -> None: + # 对齐裁剪兜底:当关键点过于贴边或超出图像边界时,先对整张图做 padding 再裁剪。 + # 该逻辑主要用于“超大脸/贴边脸”,避免 cv2.warpAffine 采样到大量黑边导致 embedding 质量变差。 + self.crop_margin_ratio = float(model_kwargs.pop("cropMarginRatio", 0.5)) + self.max_crop_pad_ratio = float(model_kwargs.pop("maxCropPadRatio", 0.5)) super().__init__(model_name, **model_kwargs) max_batch_size = settings.max_batch_size.facial_recognition if settings.max_batch_size else None self.batch_size = max_batch_size if max_batch_size else self._batch_size_default @@ -43,6 +48,12 @@ class FaceRecognizer(InferenceModel): ) return session + def configure(self, **kwargs: Any) -> None: + if (margin_ratio := kwargs.pop("cropMarginRatio", None)) is not None: + self.crop_margin_ratio = float(margin_ratio) + if (max_pad_ratio := kwargs.pop("maxCropPadRatio", None)) is not None: + self.max_crop_pad_ratio = float(max_pad_ratio) + def _predict( self, inputs: NDArray[np.uint8] | bytes | Image.Image, faces: FaceDetectionOutput ) -> FacialRecognitionOutput: @@ -55,12 +66,13 @@ class FaceRecognizer(InferenceModel): def _predict_batch(self, cropped_faces: list[NDArray[np.uint8]]) -> NDArray[np.float32]: if not self.batch_size or len(cropped_faces) <= self.batch_size: - embeddings: NDArray[np.float32] = self.model.get_feat(cropped_faces) + embeddings = np.asarray(self.model.get_feat(cropped_faces), dtype=np.float32) return embeddings batch_embeddings: list[NDArray[np.float32]] = [] for i in range(0, len(cropped_faces), self.batch_size): - batch_embeddings.append(self.model.get_feat(cropped_faces[i : i + self.batch_size])) + batch = cropped_faces[i : i + self.batch_size] + batch_embeddings.append(np.asarray(self.model.get_feat(batch), dtype=np.float32)) return np.concatenate(batch_embeddings, axis=0) def postprocess(self, faces: FaceDetectionOutput, embeddings: NDArray[np.float32]) -> FacialRecognitionOutput: @@ -74,7 +86,48 @@ class FaceRecognizer(InferenceModel): ] def _crop(self, image: NDArray[np.uint8], faces: FaceDetectionOutput) -> list[NDArray[np.uint8]]: - return [norm_crop(image, landmark) for landmark in faces["landmarks"]] + landmarks = faces["landmarks"].astype(np.float32, copy=False) + pad = self._compute_crop_pad(image, landmarks, self.crop_margin_ratio, self.max_crop_pad_ratio) + if pad > 0: + # 使用反射边界,避免纯黑 padding 影响对齐后的人脸纹理。 + padded = cv2.copyMakeBorder(image, pad, pad, pad, pad, borderType=cv2.BORDER_REFLECT_101) + padded = np.asarray(padded, dtype=np.uint8) + landmarks = landmarks + np.array([pad, pad], dtype=np.float32) + return [norm_crop(padded, landmark) for landmark in landmarks] + return [norm_crop(image, landmark) for landmark in landmarks] + + @staticmethod + def _compute_crop_pad( + image: NDArray[np.uint8], + landmarks: NDArray[np.float32], + margin_ratio: float, + max_pad_ratio: float, + ) -> int: + """根据关键点和期望边距计算需要的 padding 像素数。""" + h, w = image.shape[:2] + if landmarks.size == 0: + return 0 + + # 逐脸计算“关键点包围盒 + 边距”是否会超出图像边界,并取最大需要的 padding。 + max_needed = 0.0 + for lmk in landmarks: + min_x = float(np.min(lmk[:, 0])) + min_y = float(np.min(lmk[:, 1])) + max_x = float(np.max(lmk[:, 0])) + max_y = float(np.max(lmk[:, 1])) + face_size = max(max_x - min_x, max_y - min_y, 1.0) + margin = face_size * max(margin_ratio, 0.0) + + left_needed = max(0.0, (margin - min_x)) + top_needed = max(0.0, (margin - min_y)) + right_needed = max(0.0, (max_x + margin) - float(w - 1)) + bottom_needed = max(0.0, (max_y + margin) - float(h - 1)) + max_needed = max(max_needed, left_needed, top_needed, right_needed, bottom_needed) + + # 限制 padding 上限,避免异常关键点导致内存暴涨。 + max_allowed = max(h, w) * max(max_pad_ratio, 0.0) + pad = int(round(min(max_needed, max_allowed))) + return max(pad, 0) def _add_batch_axis(self, model_path: Path) -> None: log.debug(f"Adding batch axis to model {model_path}")