Merge e5b23625a1 into 5ade152bc5
commit
6173b7b360
|
|
@ -24,6 +24,7 @@ type MetricGroupOptions = { enabled: boolean };
|
|||
|
||||
export class MetricGroupRepository {
|
||||
private enabled = false;
|
||||
private observableGauges = new Map<string, () => number>();
|
||||
|
||||
constructor(private metricService: MetricService) {}
|
||||
|
||||
|
|
@ -45,6 +46,15 @@ export class MetricGroupRepository {
|
|||
}
|
||||
}
|
||||
|
||||
setObservableGauge(name: string, valueCallback: () => number, options?: MetricOptions): void {
|
||||
if (this.enabled && !this.observableGauges.has(name)) {
|
||||
this.observableGauges.set(name, valueCallback);
|
||||
this.metricService.getObservableGauge(name, options).addCallback((observableResult) => {
|
||||
observableResult.observe(valueCallback());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
configure(options: MetricGroupOptions): this {
|
||||
this.enabled = options.enabled;
|
||||
return this;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,53 @@
|
|||
import { ImmichTelemetry, QueueName } from 'src/enum';
|
||||
import { TelemetryService } from 'src/services/telemetry.service';
|
||||
import { newTestService, ServiceMocks } from 'test/utils';
|
||||
|
||||
describe(TelemetryService.name, () => {
|
||||
let sut: TelemetryService;
|
||||
let mocks: ServiceMocks;
|
||||
|
||||
beforeEach(() => {
|
||||
({ sut, mocks } = newTestService(TelemetryService));
|
||||
});
|
||||
|
||||
it('should work', () => {
|
||||
expect(sut).toBeDefined();
|
||||
});
|
||||
|
||||
describe('onBootstrap', () => {
|
||||
it('should register queued metrics if enabled', async () => {
|
||||
mocks.config.getEnv.mockReturnValue({
|
||||
telemetry: {
|
||||
metrics: new Set([ImmichTelemetry.Job]),
|
||||
},
|
||||
} as any);
|
||||
|
||||
mocks.job.getJobCounts.mockResolvedValue({
|
||||
waiting: 1,
|
||||
paused: 2,
|
||||
delayed: 3,
|
||||
active: 0,
|
||||
completed: 0,
|
||||
failed: 0,
|
||||
});
|
||||
|
||||
await sut.onBootstrap();
|
||||
|
||||
expect(mocks.telemetry.jobs.setObservableGauge).toHaveBeenCalledTimes(Object.keys(QueueName).length * 4);
|
||||
expect(mocks.job.getJobCounts).toHaveBeenCalledTimes(Object.keys(QueueName).length);
|
||||
});
|
||||
|
||||
it('should not register queued metrics if disabled', async () => {
|
||||
mocks.config.getEnv.mockReturnValue({
|
||||
telemetry: {
|
||||
metrics: new Set(),
|
||||
},
|
||||
} as any);
|
||||
|
||||
await sut.onBootstrap();
|
||||
|
||||
expect(mocks.telemetry.jobs.setObservableGauge).not.toHaveBeenCalled();
|
||||
expect(mocks.job.getJobCounts).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
@ -1,14 +1,88 @@
|
|||
import { snakeCase } from 'lodash';
|
||||
import { OnEvent } from 'src/decorators';
|
||||
import { ImmichWorker, JobStatus } from 'src/enum';
|
||||
import { ArgOf, ArgsOf } from 'src/repositories/event.repository';
|
||||
import { ImmichTelemetry, ImmichWorker, JobStatus, QueueName } from 'src/enum';
|
||||
import { ArgOf } from 'src/repositories/event.repository';
|
||||
import { BaseService } from 'src/services/base.service';
|
||||
|
||||
const QUEUE_METRICS_POLLING_INTERVAL = 5000;
|
||||
|
||||
export class TelemetryService extends BaseService {
|
||||
private queueWaitingCounts = new Map<string, number>();
|
||||
private queuePausedCounts = new Map<string, number>();
|
||||
private queueDelayedCounts = new Map<string, number>();
|
||||
private queueActiveCounts = new Map<string, number>();
|
||||
private pollingInterval?: NodeJS.Timeout;
|
||||
|
||||
@OnEvent({ name: 'AppBootstrap', workers: [ImmichWorker.Api] })
|
||||
async onBootstrap(): Promise<void> {
|
||||
const userCount = await this.userRepository.getCount();
|
||||
this.telemetryRepository.api.addToGauge('immich.users.total', userCount);
|
||||
|
||||
const { telemetry } = this.configRepository.getEnv();
|
||||
if (telemetry.metrics.has(ImmichTelemetry.Job)) {
|
||||
// Register observable gauges for queued metrics
|
||||
this.registerQueuedMetrics();
|
||||
|
||||
// Start polling queue statistics
|
||||
await this.updateQueuedMetrics();
|
||||
this.pollingInterval = setInterval(() => {
|
||||
void this.updateQueuedMetrics();
|
||||
}, QUEUE_METRICS_POLLING_INTERVAL);
|
||||
}
|
||||
}
|
||||
|
||||
@OnEvent({ name: 'AppShutdown' })
|
||||
onShutdown(): void {
|
||||
if (this.pollingInterval) {
|
||||
clearInterval(this.pollingInterval);
|
||||
}
|
||||
}
|
||||
|
||||
private registerQueuedMetrics(): void {
|
||||
for (const queueName of Object.values(QueueName)) {
|
||||
const queueKey = snakeCase(queueName);
|
||||
|
||||
this.telemetryRepository.jobs.setObservableGauge(
|
||||
`immich.queues.${queueKey}.waiting`,
|
||||
() => this.queueWaitingCounts.get(queueKey) ?? 0,
|
||||
{ description: `Number of waiting jobs in ${queueName} queue` },
|
||||
);
|
||||
|
||||
this.telemetryRepository.jobs.setObservableGauge(
|
||||
`immich.queues.${queueKey}.paused`,
|
||||
() => this.queuePausedCounts.get(queueKey) ?? 0,
|
||||
{ description: `Number of paused jobs in ${queueName} queue` },
|
||||
);
|
||||
|
||||
this.telemetryRepository.jobs.setObservableGauge(
|
||||
`immich.queues.${queueKey}.delayed`,
|
||||
() => this.queueDelayedCounts.get(queueKey) ?? 0,
|
||||
{ description: `Number of delayed jobs in ${queueName} queue` },
|
||||
);
|
||||
|
||||
this.telemetryRepository.jobs.setObservableGauge(
|
||||
`immich.queues.${queueKey}.active`,
|
||||
() => this.queueActiveCounts.get(queueKey) ?? 0,
|
||||
{ description: `Number of active jobs in ${queueName} queue` },
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private async updateQueuedMetrics(): Promise<void> {
|
||||
await Promise.all(
|
||||
Object.values(QueueName).map(async (queueName) => {
|
||||
try {
|
||||
const stats = await this.jobRepository.getJobCounts(queueName);
|
||||
const queueKey = snakeCase(queueName);
|
||||
this.queueWaitingCounts.set(queueKey, stats.waiting);
|
||||
this.queuePausedCounts.set(queueKey, stats.paused);
|
||||
this.queueDelayedCounts.set(queueKey, stats.delayed);
|
||||
this.queueActiveCounts.set(queueKey, stats.active);
|
||||
} catch (error) {
|
||||
this.logger.debug(`Failed to update queued metrics for ${queueName}: ${error}`);
|
||||
}
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
@OnEvent({ name: 'UserCreate' })
|
||||
|
|
@ -26,12 +100,6 @@ export class TelemetryService extends BaseService {
|
|||
this.telemetryRepository.api.addToGauge(`immich.users.total`, 1);
|
||||
}
|
||||
|
||||
@OnEvent({ name: 'JobStart' })
|
||||
onJobStart(...[queueName]: ArgsOf<'JobStart'>) {
|
||||
const queueMetric = `immich.queues.${snakeCase(queueName)}.active`;
|
||||
this.telemetryRepository.jobs.addToGauge(queueMetric, 1);
|
||||
}
|
||||
|
||||
@OnEvent({ name: 'JobSuccess' })
|
||||
onJobSuccess({ job, response }: ArgOf<'JobSuccess'>) {
|
||||
if (response && Object.values(JobStatus).includes(response as JobStatus)) {
|
||||
|
|
@ -46,12 +114,6 @@ export class TelemetryService extends BaseService {
|
|||
this.telemetryRepository.jobs.addToCounter(jobMetric, 1);
|
||||
}
|
||||
|
||||
@OnEvent({ name: 'JobComplete' })
|
||||
onJobComplete(...[queueName]: ArgsOf<'JobComplete'>) {
|
||||
const queueMetric = `immich.queues.${snakeCase(queueName)}.active`;
|
||||
this.telemetryRepository.jobs.addToGauge(queueMetric, -1);
|
||||
}
|
||||
|
||||
@OnEvent({ name: 'QueueStart' })
|
||||
onQueueStart({ name }: ArgOf<'QueueStart'>) {
|
||||
this.telemetryRepository.jobs.addToCounter(`immich.queues.${snakeCase(name)}.started`, 1);
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ const newMetricGroupMock = () => {
|
|||
addToCounter: vitest.fn(),
|
||||
addToGauge: vitest.fn(),
|
||||
addToHistogram: vitest.fn(),
|
||||
setObservableGauge: vitest.fn(),
|
||||
configure: vitest.fn(),
|
||||
};
|
||||
};
|
||||
|
|
|
|||
Loading…
Reference in New Issue