diff --git a/server/src/repositories/telemetry.repository.ts b/server/src/repositories/telemetry.repository.ts index 5fbbb76cf7..0457cb73a8 100644 --- a/server/src/repositories/telemetry.repository.ts +++ b/server/src/repositories/telemetry.repository.ts @@ -24,6 +24,7 @@ type MetricGroupOptions = { enabled: boolean }; export class MetricGroupRepository { private enabled = false; + private observableGauges = new Map number>(); constructor(private metricService: MetricService) {} @@ -45,6 +46,15 @@ export class MetricGroupRepository { } } + setObservableGauge(name: string, valueCallback: () => number, options?: MetricOptions): void { + if (this.enabled && !this.observableGauges.has(name)) { + this.observableGauges.set(name, valueCallback); + this.metricService.getObservableGauge(name, options).addCallback((observableResult) => { + observableResult.observe(valueCallback()); + }); + } + } + configure(options: MetricGroupOptions): this { this.enabled = options.enabled; return this; diff --git a/server/src/services/telemetry.service.spec.ts b/server/src/services/telemetry.service.spec.ts new file mode 100644 index 0000000000..a862041dae --- /dev/null +++ b/server/src/services/telemetry.service.spec.ts @@ -0,0 +1,53 @@ +import { ImmichTelemetry, QueueName } from 'src/enum'; +import { TelemetryService } from 'src/services/telemetry.service'; +import { newTestService, ServiceMocks } from 'test/utils'; + +describe(TelemetryService.name, () => { + let sut: TelemetryService; + let mocks: ServiceMocks; + + beforeEach(() => { + ({ sut, mocks } = newTestService(TelemetryService)); + }); + + it('should work', () => { + expect(sut).toBeDefined(); + }); + + describe('onBootstrap', () => { + it('should register queued metrics if enabled', async () => { + mocks.config.getEnv.mockReturnValue({ + telemetry: { + metrics: new Set([ImmichTelemetry.Job]), + }, + } as any); + + mocks.job.getJobCounts.mockResolvedValue({ + waiting: 1, + paused: 2, + delayed: 3, + active: 0, + completed: 0, + failed: 0, + }); + + await sut.onBootstrap(); + + expect(mocks.telemetry.jobs.setObservableGauge).toHaveBeenCalledTimes(Object.keys(QueueName).length * 4); + expect(mocks.job.getJobCounts).toHaveBeenCalledTimes(Object.keys(QueueName).length); + }); + + it('should not register queued metrics if disabled', async () => { + mocks.config.getEnv.mockReturnValue({ + telemetry: { + metrics: new Set(), + }, + } as any); + + await sut.onBootstrap(); + + expect(mocks.telemetry.jobs.setObservableGauge).not.toHaveBeenCalled(); + expect(mocks.job.getJobCounts).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/server/src/services/telemetry.service.ts b/server/src/services/telemetry.service.ts index 7c4fe43214..db6ef66f18 100644 --- a/server/src/services/telemetry.service.ts +++ b/server/src/services/telemetry.service.ts @@ -1,14 +1,88 @@ import { snakeCase } from 'lodash'; import { OnEvent } from 'src/decorators'; -import { ImmichWorker, JobStatus } from 'src/enum'; -import { ArgOf, ArgsOf } from 'src/repositories/event.repository'; +import { ImmichTelemetry, ImmichWorker, JobStatus, QueueName } from 'src/enum'; +import { ArgOf } from 'src/repositories/event.repository'; import { BaseService } from 'src/services/base.service'; +const QUEUE_METRICS_POLLING_INTERVAL = 5000; + export class TelemetryService extends BaseService { + private queueWaitingCounts = new Map(); + private queuePausedCounts = new Map(); + private queueDelayedCounts = new Map(); + private queueActiveCounts = new Map(); + private pollingInterval?: NodeJS.Timeout; + @OnEvent({ name: 'AppBootstrap', workers: [ImmichWorker.Api] }) async onBootstrap(): Promise { const userCount = await this.userRepository.getCount(); this.telemetryRepository.api.addToGauge('immich.users.total', userCount); + + const { telemetry } = this.configRepository.getEnv(); + if (telemetry.metrics.has(ImmichTelemetry.Job)) { + // Register observable gauges for queued metrics + this.registerQueuedMetrics(); + + // Start polling queue statistics + await this.updateQueuedMetrics(); + this.pollingInterval = setInterval(() => { + void this.updateQueuedMetrics(); + }, QUEUE_METRICS_POLLING_INTERVAL); + } + } + + @OnEvent({ name: 'AppShutdown' }) + onShutdown(): void { + if (this.pollingInterval) { + clearInterval(this.pollingInterval); + } + } + + private registerQueuedMetrics(): void { + for (const queueName of Object.values(QueueName)) { + const queueKey = snakeCase(queueName); + + this.telemetryRepository.jobs.setObservableGauge( + `immich.queues.${queueKey}.waiting`, + () => this.queueWaitingCounts.get(queueKey) ?? 0, + { description: `Number of waiting jobs in ${queueName} queue` }, + ); + + this.telemetryRepository.jobs.setObservableGauge( + `immich.queues.${queueKey}.paused`, + () => this.queuePausedCounts.get(queueKey) ?? 0, + { description: `Number of paused jobs in ${queueName} queue` }, + ); + + this.telemetryRepository.jobs.setObservableGauge( + `immich.queues.${queueKey}.delayed`, + () => this.queueDelayedCounts.get(queueKey) ?? 0, + { description: `Number of delayed jobs in ${queueName} queue` }, + ); + + this.telemetryRepository.jobs.setObservableGauge( + `immich.queues.${queueKey}.active`, + () => this.queueActiveCounts.get(queueKey) ?? 0, + { description: `Number of active jobs in ${queueName} queue` }, + ); + } + } + + private async updateQueuedMetrics(): Promise { + await Promise.all( + Object.values(QueueName).map(async (queueName) => { + try { + const stats = await this.jobRepository.getJobCounts(queueName); + const queueKey = snakeCase(queueName); + this.queueWaitingCounts.set(queueKey, stats.waiting); + this.queuePausedCounts.set(queueKey, stats.paused); + this.queueDelayedCounts.set(queueKey, stats.delayed); + this.queueActiveCounts.set(queueKey, stats.active); + } catch (error) { + this.logger.debug(`Failed to update queued metrics for ${queueName}: ${error}`); + } + }), + ); } @OnEvent({ name: 'UserCreate' }) @@ -26,12 +100,6 @@ export class TelemetryService extends BaseService { this.telemetryRepository.api.addToGauge(`immich.users.total`, 1); } - @OnEvent({ name: 'JobStart' }) - onJobStart(...[queueName]: ArgsOf<'JobStart'>) { - const queueMetric = `immich.queues.${snakeCase(queueName)}.active`; - this.telemetryRepository.jobs.addToGauge(queueMetric, 1); - } - @OnEvent({ name: 'JobSuccess' }) onJobSuccess({ job, response }: ArgOf<'JobSuccess'>) { if (response && Object.values(JobStatus).includes(response as JobStatus)) { @@ -46,12 +114,6 @@ export class TelemetryService extends BaseService { this.telemetryRepository.jobs.addToCounter(jobMetric, 1); } - @OnEvent({ name: 'JobComplete' }) - onJobComplete(...[queueName]: ArgsOf<'JobComplete'>) { - const queueMetric = `immich.queues.${snakeCase(queueName)}.active`; - this.telemetryRepository.jobs.addToGauge(queueMetric, -1); - } - @OnEvent({ name: 'QueueStart' }) onQueueStart({ name }: ArgOf<'QueueStart'>) { this.telemetryRepository.jobs.addToCounter(`immich.queues.${snakeCase(name)}.started`, 1); diff --git a/server/test/repositories/telemetry.repository.mock.ts b/server/test/repositories/telemetry.repository.mock.ts index c7442052da..919ebee790 100644 --- a/server/test/repositories/telemetry.repository.mock.ts +++ b/server/test/repositories/telemetry.repository.mock.ts @@ -7,6 +7,7 @@ const newMetricGroupMock = () => { addToCounter: vitest.fn(), addToGauge: vitest.fn(), addToHistogram: vitest.fn(), + setObservableGauge: vitest.fn(), configure: vitest.fn(), }; };