diff --git a/server/src/enum.ts b/server/src/enum.ts index 27cab3fb5e..b603d7a3a7 100644 --- a/server/src/enum.ts +++ b/server/src/enum.ts @@ -920,6 +920,8 @@ export enum DatabaseLock { MaintenanceOperation = 621, MemoryCreation = 777, VersionCheck = 800, + FacialRecognition = 900, + DuplicateDetection = 1000, } export enum MaintenanceAction { diff --git a/server/src/services/duplicate.service.ts b/server/src/services/duplicate.service.ts index 6e9e62ba0b..f854796b69 100644 --- a/server/src/services/duplicate.service.ts +++ b/server/src/services/duplicate.service.ts @@ -5,7 +5,7 @@ import { BulkIdErrorReason, BulkIdResponseDto, BulkIdsDto } from 'src/dtos/asset import { MapAsset, mapAsset } from 'src/dtos/asset-response.dto'; import { AuthDto } from 'src/dtos/auth.dto'; import { DuplicateResolveDto, DuplicateResolveGroupDto, DuplicateResponseDto } from 'src/dtos/duplicate.dto'; -import { AssetStatus, AssetVisibility, JobName, JobStatus, Permission, QueueName } from 'src/enum'; +import { AssetStatus, AssetVisibility, DatabaseLock, JobName, JobStatus, Permission, QueueName } from 'src/enum'; import { AssetDuplicateResult } from 'src/repositories/search.repository'; import { BaseService } from 'src/services/base.service'; import { JobItem, JobOf } from 'src/types'; @@ -326,60 +326,62 @@ export class DuplicateService extends BaseService { @OnJob({ name: JobName.AssetDetectDuplicates, queue: QueueName.DuplicateDetection }) async handleSearchDuplicates({ id }: JobOf): Promise { - const { machineLearning } = await this.getConfig({ withCache: true }); - if (!isDuplicateDetectionEnabled(machineLearning)) { - return JobStatus.Skipped; - } + return this.databaseRepository.withLock(DatabaseLock.DuplicateDetection, async () => { + const { machineLearning } = await this.getConfig({ withCache: true }); + if (!isDuplicateDetectionEnabled(machineLearning)) { + return JobStatus.Skipped; + } - const asset = await this.assetJobRepository.getForSearchDuplicatesJob(id); - if (!asset) { - this.logger.error(`Asset ${id} not found`); - return JobStatus.Failed; - } + const asset = await this.assetJobRepository.getForSearchDuplicatesJob(id); + if (!asset) { + this.logger.error(`Asset ${id} not found`); + return JobStatus.Failed; + } - if (asset.stackId) { - this.logger.debug(`Asset ${id} is part of a stack, skipping`); - return JobStatus.Skipped; - } + if (asset.stackId) { + this.logger.debug(`Asset ${id} is part of a stack, skipping`); + return JobStatus.Skipped; + } - if (asset.visibility === AssetVisibility.Hidden) { - this.logger.debug(`Asset ${id} is not visible, skipping`); - return JobStatus.Skipped; - } + if (asset.visibility === AssetVisibility.Hidden) { + this.logger.debug(`Asset ${id} is not visible, skipping`); + return JobStatus.Skipped; + } - if (asset.visibility === AssetVisibility.Locked) { - this.logger.debug(`Asset ${id} is locked, skipping`); - return JobStatus.Skipped; - } + if (asset.visibility === AssetVisibility.Locked) { + this.logger.debug(`Asset ${id} is locked, skipping`); + return JobStatus.Skipped; + } - if (!asset.embedding) { - this.logger.debug(`Asset ${id} is missing embedding`); - return JobStatus.Failed; - } + if (!asset.embedding) { + this.logger.debug(`Asset ${id} is missing embedding`); + return JobStatus.Failed; + } - const duplicateAssets = await this.duplicateRepository.search({ - assetId: asset.id, - embedding: asset.embedding, - maxDistance: machineLearning.duplicateDetection.maxDistance, - type: asset.type, - userIds: [asset.ownerId], + const duplicateAssets = await this.duplicateRepository.search({ + assetId: asset.id, + embedding: asset.embedding, + maxDistance: machineLearning.duplicateDetection.maxDistance, + type: asset.type, + userIds: [asset.ownerId], + }); + + let assetIds = [asset.id]; + if (duplicateAssets.length > 0) { + this.logger.debug( + `Found ${duplicateAssets.length} duplicate${duplicateAssets.length === 1 ? '' : 's'} for asset ${asset.id}`, + ); + assetIds = await this.updateDuplicates(asset, duplicateAssets); + } else if (asset.duplicateId) { + this.logger.debug(`No duplicates found for asset ${asset.id}, removing duplicateId`); + await this.assetRepository.update({ id: asset.id, duplicateId: null }); + } + + const duplicatesDetectedAt = new Date(); + await this.assetRepository.upsertJobStatus(...assetIds.map((assetId) => ({ assetId, duplicatesDetectedAt }))); + + return JobStatus.Success; }); - - let assetIds = [asset.id]; - if (duplicateAssets.length > 0) { - this.logger.debug( - `Found ${duplicateAssets.length} duplicate${duplicateAssets.length === 1 ? '' : 's'} for asset ${asset.id}`, - ); - assetIds = await this.updateDuplicates(asset, duplicateAssets); - } else if (asset.duplicateId) { - this.logger.debug(`No duplicates found for asset ${asset.id}, removing duplicateId`); - await this.assetRepository.update({ id: asset.id, duplicateId: null }); - } - - const duplicatesDetectedAt = new Date(); - await this.assetRepository.upsertJobStatus(...assetIds.map((assetId) => ({ assetId, duplicatesDetectedAt }))); - - return JobStatus.Success; } private async updateDuplicates( diff --git a/server/src/services/person.service.ts b/server/src/services/person.service.ts index fde5313f4d..e2a92a7f3b 100644 --- a/server/src/services/person.service.ts +++ b/server/src/services/person.service.ts @@ -25,6 +25,7 @@ import { import { AssetVisibility, CacheControl, + DatabaseLock, JobName, JobStatus, Permission, @@ -402,144 +403,148 @@ export class PersonService extends BaseService { @OnJob({ name: JobName.FacialRecognitionQueueAll, queue: QueueName.FacialRecognition }) async handleQueueRecognizeFaces({ force, nightly }: JobOf): Promise { - const { machineLearning } = await this.getConfig({ withCache: false }); - if (!isFacialRecognitionEnabled(machineLearning)) { - return JobStatus.Skipped; - } - - await this.jobRepository.waitForQueueCompletion(QueueName.ThumbnailGeneration, QueueName.FaceDetection); - - if (nightly) { - const [state, latestFaceDate] = await Promise.all([ - this.systemMetadataRepository.get(SystemMetadataKey.FacialRecognitionState), - this.personRepository.getLatestFaceDate(), - ]); - - if (state?.lastRun && latestFaceDate && state.lastRun > latestFaceDate) { - this.logger.debug('Skipping facial recognition nightly since no face has been added since the last run'); + return this.databaseRepository.withLock(DatabaseLock.FacialRecognition, async () => { + const { machineLearning } = await this.getConfig({ withCache: false }); + if (!isFacialRecognitionEnabled(machineLearning)) { return JobStatus.Skipped; } - } - const { waiting } = await this.jobRepository.getJobCounts(QueueName.FacialRecognition); + await this.jobRepository.waitForQueueCompletion(QueueName.ThumbnailGeneration, QueueName.FaceDetection); - if (force) { - await this.personRepository.unassignFaces({ sourceType: SourceType.MachineLearning }); - await this.handlePersonCleanup(); - await this.personRepository.vacuum({ reindexVectors: false }); - } else if (waiting) { - this.logger.debug( - `Skipping facial recognition queueing because ${waiting} job${waiting > 1 ? 's are' : ' is'} already queued`, - ); - return JobStatus.Skipped; - } + if (nightly) { + const [state, latestFaceDate] = await Promise.all([ + this.systemMetadataRepository.get(SystemMetadataKey.FacialRecognitionState), + this.personRepository.getLatestFaceDate(), + ]); - await this.databaseRepository.prewarm(VectorIndex.Face); - - const lastRun = new Date().toISOString(); - const facePagination = this.personRepository.getAllFaces( - force ? undefined : { personId: null, sourceType: SourceType.MachineLearning }, - ); - - let jobs: { name: JobName.FacialRecognition; data: { id: string; deferred: false } }[] = []; - for await (const face of facePagination) { - jobs.push({ name: JobName.FacialRecognition, data: { id: face.id, deferred: false } }); - - if (jobs.length === JOBS_ASSET_PAGINATION_SIZE) { - await this.jobRepository.queueAll(jobs); - jobs = []; + if (state?.lastRun && latestFaceDate && state.lastRun > latestFaceDate) { + this.logger.debug('Skipping facial recognition nightly since no face has been added since the last run'); + return JobStatus.Skipped; + } } - } - await this.jobRepository.queueAll(jobs); + const { waiting } = await this.jobRepository.getJobCounts(QueueName.FacialRecognition); - await this.systemMetadataRepository.set(SystemMetadataKey.FacialRecognitionState, { lastRun }); + if (force) { + await this.personRepository.unassignFaces({ sourceType: SourceType.MachineLearning }); + await this.handlePersonCleanup(); + await this.personRepository.vacuum({ reindexVectors: false }); + } else if (waiting) { + this.logger.debug( + `Skipping facial recognition queueing because ${waiting} job${waiting > 1 ? 's are' : ' is'} already queued`, + ); + return JobStatus.Skipped; + } - return JobStatus.Success; + await this.databaseRepository.prewarm(VectorIndex.Face); + + const lastRun = new Date().toISOString(); + const facePagination = this.personRepository.getAllFaces( + force ? undefined : { personId: null, sourceType: SourceType.MachineLearning }, + ); + + let jobs: { name: JobName.FacialRecognition; data: { id: string; deferred: false } }[] = []; + for await (const face of facePagination) { + jobs.push({ name: JobName.FacialRecognition, data: { id: face.id, deferred: false } }); + + if (jobs.length === JOBS_ASSET_PAGINATION_SIZE) { + await this.jobRepository.queueAll(jobs); + jobs = []; + } + } + + await this.jobRepository.queueAll(jobs); + + await this.systemMetadataRepository.set(SystemMetadataKey.FacialRecognitionState, { lastRun }); + + return JobStatus.Success; + }); } @OnJob({ name: JobName.FacialRecognition, queue: QueueName.FacialRecognition }) async handleRecognizeFaces({ id, deferred }: JobOf): Promise { - const { machineLearning } = await this.getConfig({ withCache: true }); - if (!isFacialRecognitionEnabled(machineLearning)) { - return JobStatus.Skipped; - } + return this.databaseRepository.withLock(DatabaseLock.FacialRecognition, async () => { + const { machineLearning } = await this.getConfig({ withCache: true }); + if (!isFacialRecognitionEnabled(machineLearning)) { + return JobStatus.Skipped; + } - const face = await this.personRepository.getFaceForFacialRecognitionJob(id); - if (!face || !face.asset) { - this.logger.warn(`Face ${id} not found`); - return JobStatus.Failed; - } + const face = await this.personRepository.getFaceForFacialRecognitionJob(id); + if (!face || !face.asset) { + this.logger.warn(`Face ${id} not found`); + return JobStatus.Failed; + } - if (face.sourceType !== SourceType.MachineLearning) { - this.logger.warn(`Skipping face ${id} due to source ${face.sourceType}`); - return JobStatus.Skipped; - } + if (face.sourceType !== SourceType.MachineLearning) { + this.logger.warn(`Skipping face ${id} due to source ${face.sourceType}`); + return JobStatus.Skipped; + } - if (!face.faceSearch?.embedding) { - this.logger.warn(`Face ${id} does not have an embedding`); - return JobStatus.Failed; - } + if (!face.faceSearch?.embedding) { + this.logger.warn(`Face ${id} does not have an embedding`); + return JobStatus.Failed; + } - if (face.personId) { - this.logger.debug(`Face ${id} already has a person assigned`); - return JobStatus.Skipped; - } + if (face.personId) { + this.logger.debug(`Face ${id} already has a person assigned`); + return JobStatus.Skipped; + } - const matches = await this.searchRepository.searchFaces({ - userIds: [face.asset.ownerId], - embedding: face.faceSearch.embedding, - maxDistance: machineLearning.facialRecognition.maxDistance, - numResults: machineLearning.facialRecognition.minFaces, - minBirthDate: new Date(face.asset.fileCreatedAt), - }); - - // `matches` also includes the face itself - if (machineLearning.facialRecognition.minFaces > 1 && matches.length <= 1) { - this.logger.debug(`Face ${id} only matched the face itself, skipping`); - return JobStatus.Skipped; - } - - this.logger.debug(`Face ${id} has ${matches.length} matches`); - - const isCore = - matches.length >= machineLearning.facialRecognition.minFaces && - face.asset.visibility === AssetVisibility.Timeline; - if (!isCore && !deferred) { - this.logger.debug(`Deferring non-core face ${id} for later processing`); - await this.jobRepository.queue({ name: JobName.FacialRecognition, data: { id, deferred: true } }); - return JobStatus.Skipped; - } - - let personId = matches.find((match) => match.personId)?.personId; - if (!personId) { - const matchWithPerson = await this.searchRepository.searchFaces({ + const matches = await this.searchRepository.searchFaces({ userIds: [face.asset.ownerId], embedding: face.faceSearch.embedding, maxDistance: machineLearning.facialRecognition.maxDistance, - numResults: 1, - hasPerson: true, + numResults: machineLearning.facialRecognition.minFaces, minBirthDate: new Date(face.asset.fileCreatedAt), }); - if (matchWithPerson.length > 0) { - personId = matchWithPerson[0].personId; + // `matches` also includes the face itself + if (machineLearning.facialRecognition.minFaces > 1 && matches.length <= 1) { + this.logger.debug(`Face ${id} only matched the face itself, skipping`); + return JobStatus.Skipped; } - } - if (isCore && !personId) { - this.logger.log(`Creating new person for face ${id}`); - const newPerson = await this.personRepository.create({ ownerId: face.asset.ownerId, faceAssetId: face.id }); - await this.jobRepository.queue({ name: JobName.PersonGenerateThumbnail, data: { id: newPerson.id } }); - personId = newPerson.id; - } + this.logger.debug(`Face ${id} has ${matches.length} matches`); - if (personId) { - this.logger.debug(`Assigning face ${id} to person ${personId}`); - await this.personRepository.reassignFaces({ faceIds: [id], newPersonId: personId }); - } + const isCore = + matches.length >= machineLearning.facialRecognition.minFaces && + face.asset.visibility === AssetVisibility.Timeline; + if (!isCore && !deferred) { + this.logger.debug(`Deferring non-core face ${id} for later processing`); + await this.jobRepository.queue({ name: JobName.FacialRecognition, data: { id, deferred: true } }); + return JobStatus.Skipped; + } - return JobStatus.Success; + let personId = matches.find((match) => match.personId)?.personId; + if (!personId) { + const matchWithPerson = await this.searchRepository.searchFaces({ + userIds: [face.asset.ownerId], + embedding: face.faceSearch.embedding, + maxDistance: machineLearning.facialRecognition.maxDistance, + numResults: 1, + hasPerson: true, + minBirthDate: new Date(face.asset.fileCreatedAt), + }); + + if (matchWithPerson.length > 0) { + personId = matchWithPerson[0].personId; + } + } + + if (isCore && !personId) { + this.logger.log(`Creating new person for face ${id}`); + const newPerson = await this.personRepository.create({ ownerId: face.asset.ownerId, faceAssetId: face.id }); + await this.jobRepository.queue({ name: JobName.PersonGenerateThumbnail, data: { id: newPerson.id } }); + personId = newPerson.id; + } + + if (personId) { + this.logger.debug(`Assigning face ${id} to person ${personId}`); + await this.personRepository.reassignFaces({ faceIds: [id], newPersonId: personId }); + } + + return JobStatus.Success; + }); } @OnJob({ name: JobName.PersonFileMigration, queue: QueueName.Migration })