fix(mobile): proper background task cleanup (#28694)

* event-based cancellation

wire hash cancellation

await cleanup

remove forced kill

add regression tests

abort sync requests

fix cleanup ordering in teardown

exit isolate

test background sync

test sigabrt crash

cleanup

* abort local sync
pull/28579/merge
Mert 2026-06-03 08:16:19 -04:00 committed by GitHub
parent 96d521e149
commit 963862b1b9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 990 additions and 397 deletions

View File

@ -542,16 +542,17 @@ private open class MessagesPigeonCodec : StandardMessageCodec() {
/** Generated interface from Pigeon that represents a handler of messages from Flutter. */
interface NativeSyncApi {
fun shouldFullSync(): Boolean
fun getMediaChanges(): SyncDelta
fun shouldFullSync(callback: (Result<Boolean>) -> Unit)
fun getMediaChanges(callback: (Result<SyncDelta>) -> Unit)
fun checkpointSync()
fun clearSyncCheckpoint()
fun getAssetIdsForAlbum(albumId: String): List<String>
fun getAlbums(): List<PlatformAlbum>
fun getAssetIdsForAlbum(albumId: String, callback: (Result<List<String>>) -> Unit)
fun getAlbums(callback: (Result<List<PlatformAlbum>>) -> Unit)
fun getAssetsCountSince(albumId: String, timestamp: Long): Long
fun getAssetsForAlbum(albumId: String, updatedTimeCond: Long?): List<PlatformAsset>
fun getAssetsForAlbum(albumId: String, updatedTimeCond: Long?, callback: (Result<List<PlatformAsset>>) -> Unit)
fun hashAssets(assetIds: List<String>, allowNetworkAccess: Boolean, callback: (Result<List<HashResult>>) -> Unit)
fun cancelHashing()
fun cancelSync()
fun getTrashedAssets(): Map<String, List<PlatformAsset>>
fun restoreFromTrashById(mediaId: String, type: Long, callback: (Result<Boolean>) -> Unit)
fun getCloudIdForAssetIds(assetIds: List<String>): List<CloudIdResult>
@ -570,27 +571,33 @@ interface NativeSyncApi {
val channel = BasicMessageChannel<Any?>(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.shouldFullSync$separatedMessageChannelSuffix", codec)
if (api != null) {
channel.setMessageHandler { _, reply ->
val wrapped: List<Any?> = try {
listOf(api.shouldFullSync())
} catch (exception: Throwable) {
MessagesPigeonUtils.wrapError(exception)
api.shouldFullSync{ result: Result<Boolean> ->
val error = result.exceptionOrNull()
if (error != null) {
reply.reply(MessagesPigeonUtils.wrapError(error))
} else {
val data = result.getOrNull()
reply.reply(MessagesPigeonUtils.wrapResult(data))
}
}
reply.reply(wrapped)
}
} else {
channel.setMessageHandler(null)
}
}
run {
val channel = BasicMessageChannel<Any?>(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getMediaChanges$separatedMessageChannelSuffix", codec, taskQueue)
val channel = BasicMessageChannel<Any?>(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getMediaChanges$separatedMessageChannelSuffix", codec)
if (api != null) {
channel.setMessageHandler { _, reply ->
val wrapped: List<Any?> = try {
listOf(api.getMediaChanges())
} catch (exception: Throwable) {
MessagesPigeonUtils.wrapError(exception)
api.getMediaChanges{ result: Result<SyncDelta> ->
val error = result.exceptionOrNull()
if (error != null) {
reply.reply(MessagesPigeonUtils.wrapError(error))
} else {
val data = result.getOrNull()
reply.reply(MessagesPigeonUtils.wrapResult(data))
}
}
reply.reply(wrapped)
}
} else {
channel.setMessageHandler(null)
@ -629,32 +636,38 @@ interface NativeSyncApi {
}
}
run {
val channel = BasicMessageChannel<Any?>(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetIdsForAlbum$separatedMessageChannelSuffix", codec, taskQueue)
val channel = BasicMessageChannel<Any?>(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetIdsForAlbum$separatedMessageChannelSuffix", codec)
if (api != null) {
channel.setMessageHandler { message, reply ->
val args = message as List<Any?>
val albumIdArg = args[0] as String
val wrapped: List<Any?> = try {
listOf(api.getAssetIdsForAlbum(albumIdArg))
} catch (exception: Throwable) {
MessagesPigeonUtils.wrapError(exception)
api.getAssetIdsForAlbum(albumIdArg) { result: Result<List<String>> ->
val error = result.exceptionOrNull()
if (error != null) {
reply.reply(MessagesPigeonUtils.wrapError(error))
} else {
val data = result.getOrNull()
reply.reply(MessagesPigeonUtils.wrapResult(data))
}
}
reply.reply(wrapped)
}
} else {
channel.setMessageHandler(null)
}
}
run {
val channel = BasicMessageChannel<Any?>(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAlbums$separatedMessageChannelSuffix", codec, taskQueue)
val channel = BasicMessageChannel<Any?>(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAlbums$separatedMessageChannelSuffix", codec)
if (api != null) {
channel.setMessageHandler { _, reply ->
val wrapped: List<Any?> = try {
listOf(api.getAlbums())
} catch (exception: Throwable) {
MessagesPigeonUtils.wrapError(exception)
api.getAlbums{ result: Result<List<PlatformAlbum>> ->
val error = result.exceptionOrNull()
if (error != null) {
reply.reply(MessagesPigeonUtils.wrapError(error))
} else {
val data = result.getOrNull()
reply.reply(MessagesPigeonUtils.wrapResult(data))
}
}
reply.reply(wrapped)
}
} else {
channel.setMessageHandler(null)
@ -679,18 +692,21 @@ interface NativeSyncApi {
}
}
run {
val channel = BasicMessageChannel<Any?>(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetsForAlbum$separatedMessageChannelSuffix", codec, taskQueue)
val channel = BasicMessageChannel<Any?>(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetsForAlbum$separatedMessageChannelSuffix", codec)
if (api != null) {
channel.setMessageHandler { message, reply ->
val args = message as List<Any?>
val albumIdArg = args[0] as String
val updatedTimeCondArg = args[1] as Long?
val wrapped: List<Any?> = try {
listOf(api.getAssetsForAlbum(albumIdArg, updatedTimeCondArg))
} catch (exception: Throwable) {
MessagesPigeonUtils.wrapError(exception)
api.getAssetsForAlbum(albumIdArg, updatedTimeCondArg) { result: Result<List<PlatformAsset>> ->
val error = result.exceptionOrNull()
if (error != null) {
reply.reply(MessagesPigeonUtils.wrapError(error))
} else {
val data = result.getOrNull()
reply.reply(MessagesPigeonUtils.wrapResult(data))
}
}
reply.reply(wrapped)
}
} else {
channel.setMessageHandler(null)
@ -733,6 +749,22 @@ interface NativeSyncApi {
channel.setMessageHandler(null)
}
}
run {
val channel = BasicMessageChannel<Any?>(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.cancelSync$separatedMessageChannelSuffix", codec)
if (api != null) {
channel.setMessageHandler { _, reply ->
val wrapped: List<Any?> = try {
api.cancelSync()
listOf(null)
} catch (exception: Throwable) {
MessagesPigeonUtils.wrapError(exception)
}
reply.reply(wrapped)
}
} else {
channel.setMessageHandler(null)
}
}
run {
val channel = BasicMessageChannel<Any?>(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getTrashedAssets$separatedMessageChannelSuffix", codec, taskQueue)
if (api != null) {

View File

@ -4,7 +4,11 @@ import android.content.Context
class NativeSyncApiImpl26(context: Context) : NativeSyncApiImplBase(context), NativeSyncApi {
override fun shouldFullSync(): Boolean {
override fun shouldFullSync(callback: (Result<Boolean>) -> Unit) {
runSync(callback) { shouldFullSync() }
}
private fun shouldFullSync(): Boolean {
return true
}
@ -18,7 +22,11 @@ class NativeSyncApiImpl26(context: Context) : NativeSyncApiImplBase(context), Na
// No-op for Android 10 and below
}
override fun getMediaChanges(): SyncDelta {
override fun getMediaChanges(callback: (Result<SyncDelta>) -> Unit) {
runSync(callback) { getMediaChanges() }
}
private fun getMediaChanges(): SyncDelta {
throw IllegalStateException("Method not supported on this Android version.")
}

View File

@ -7,6 +7,8 @@ import android.os.Bundle
import android.provider.MediaStore
import androidx.annotation.RequiresApi
import androidx.annotation.RequiresExtension
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.ensureActive
import kotlinx.serialization.json.Json
@RequiresApi(Build.VERSION_CODES.Q)
@ -35,7 +37,11 @@ class NativeSyncApiImpl30(context: Context) : NativeSyncApiImplBase(context), Na
}
}
override fun shouldFullSync(): Boolean =
override fun shouldFullSync(callback: (Result<Boolean>) -> Unit) {
runSync(callback) { shouldFullSync() }
}
private fun shouldFullSync(): Boolean =
MediaStore.getVersion(ctx) != prefs.getString(SHARED_PREF_MEDIA_STORE_VERSION_KEY, null)
override fun checkpointSync() {
@ -49,7 +55,11 @@ class NativeSyncApiImpl30(context: Context) : NativeSyncApiImplBase(context), Na
}
}
override fun getMediaChanges(): SyncDelta {
override fun getMediaChanges(callback: (Result<SyncDelta>) -> Unit) {
runSync(callback) { getMediaChanges() }
}
private suspend fun getMediaChanges(): SyncDelta {
val genMap = getSavedGenerationMap()
val currentVolumes = MediaStore.getExternalVolumeNames(ctx)
val changed = mutableListOf<PlatformAsset>()
@ -58,6 +68,7 @@ class NativeSyncApiImpl30(context: Context) : NativeSyncApiImplBase(context), Na
var hasChanges = genMap.keys != currentVolumes
for (volume in currentVolumes) {
currentCoroutineContext().ensureActive()
val currentGen = MediaStore.getGeneration(ctx, volume)
val storedGen = genMap[volume] ?: 0
if (currentGen <= storedGen) {

View File

@ -45,12 +45,14 @@ open class NativeSyncApiImplBase(context: Context) : ImmichPlugin(), ActivityAwa
private val ctx: Context = context.applicationContext
private var hashTask: Job? = null
private var syncJob: Job? = null
private val mediaTrashDelegate = MediaTrashDelegate(ctx)
companion object {
private const val MAX_CONCURRENT_HASH_OPERATIONS = 16
private val hashSemaphore = Semaphore(MAX_CONCURRENT_HASH_OPERATIONS)
private const val HASHING_CANCELLED_CODE = "HASH_CANCELLED"
private const val SYNC_CANCELLED_CODE = "SYNC_CANCELLED"
// MediaStore.Files.FileColumns.SPECIAL_FORMAT — S Extensions 21+
// https://developer.android.com/reference/android/provider/MediaStore.Files.FileColumns#SPECIAL_FORMAT
@ -295,7 +297,11 @@ open class NativeSyncApiImplBase(context: Context) : ImmichPlugin(), ActivityAwa
return PlatformAssetPlaybackStyle.IMAGE
}
fun getAlbums(): List<PlatformAlbum> {
fun getAlbums(callback: (Result<List<PlatformAlbum>>) -> Unit) {
runSync(callback) { getAlbums() }
}
private suspend fun getAlbums(): List<PlatformAlbum> {
val albums = mutableListOf<PlatformAlbum>()
val albumsCount = mutableMapOf<String, Int>()
@ -322,6 +328,7 @@ open class NativeSyncApiImplBase(context: Context) : ImmichPlugin(), ActivityAwa
cursor.getColumnIndexOrThrow(MediaStore.Files.FileColumns.DATE_MODIFIED)
while (cursor.moveToNext()) {
currentCoroutineContext().ensureActive()
val id = cursor.getString(bucketIdColumn)
val count = albumsCount.getOrDefault(id, 0)
@ -342,7 +349,11 @@ open class NativeSyncApiImplBase(context: Context) : ImmichPlugin(), ActivityAwa
.sortedBy { it.id }
}
fun getAssetIdsForAlbum(albumId: String): List<String> {
fun getAssetIdsForAlbum(albumId: String, callback: (Result<List<String>>) -> Unit) {
runSync(callback) { getAssetIdsForAlbum(albumId) }
}
private fun getAssetIdsForAlbum(albumId: String): List<String> {
val projection = arrayOf(MediaStore.MediaColumns._ID)
return getCursor(
@ -366,7 +377,11 @@ open class NativeSyncApiImplBase(context: Context) : ImmichPlugin(), ActivityAwa
)?.use { cursor -> cursor.count.toLong() } ?: 0L
fun getAssetsForAlbum(albumId: String, updatedTimeCond: Long?): List<PlatformAsset> {
fun getAssetsForAlbum(albumId: String, updatedTimeCond: Long?, callback: (Result<List<PlatformAsset>>) -> Unit) {
runSync(callback) { getAssetsForAlbum(albumId, updatedTimeCond) }
}
private fun getAssetsForAlbum(albumId: String, updatedTimeCond: Long?): List<PlatformAsset> {
var selection = "$BUCKET_SELECTION AND $MEDIA_SELECTION"
val selectionArgs = mutableListOf(albumId, *MEDIA_SELECTION_ARGS)
@ -451,6 +466,24 @@ open class NativeSyncApiImplBase(context: Context) : ImmichPlugin(), ActivityAwa
hashTask = null
}
fun cancelSync() {
syncJob?.cancel()
syncJob = null
}
protected fun <T> runSync(callback: (Result<T>) -> Unit, work: suspend () -> T) {
syncJob?.cancel()
syncJob = CoroutineScope(Dispatchers.IO).launch {
try {
completeWhenActive(callback, Result.success(work()))
} catch (e: CancellationException) {
completeWhenActive(callback, Result.failure(FlutterError(SYNC_CANCELLED_CODE, "Sync cancelled", null)))
} catch (e: Exception) {
completeWhenActive(callback, Result.failure(e))
}
}
}
fun restoreFromTrashById(mediaId: String, type: Long, callback: (Result<Boolean>) -> Unit) {
mediaTrashDelegate.restoreFromTrashById(mediaId, type) { completeWhenActive(callback, it) }
}

View File

@ -0,0 +1,154 @@
import 'dart:async';
import 'package:drift/drift.dart' show Value;
import 'package:flutter_test/flutter_test.dart';
import 'package:immich_mobile/domain/models/store.model.dart';
import 'package:immich_mobile/domain/utils/background_sync.dart';
import 'package:immich_mobile/entities/store.entity.dart';
import 'package:immich_mobile/infrastructure/entities/user.entity.drift.dart';
import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
import 'package:immich_mobile/main.dart' as app;
import 'package:immich_mobile/services/api.service.dart';
import 'package:immich_mobile/utils/bootstrap.dart';
import 'package:immich_mobile/wm_executor.dart';
import 'package:integration_test/integration_test.dart';
import 'package:openapi/api.dart';
import 'test_utils/fake_immich_server.dart';
void main() {
final binding = IntegrationTestWidgetsFlutterBinding.ensureInitialized();
// These tests do real I/O without pumping a widget tree, so disable the fake async clock
binding.framePolicy = LiveTestWidgetsFlutterBindingFramePolicy.fullyLive;
late Drift drift;
late FakeImmichServer server;
setUpAll(() async {
await app.initApp();
(drift, _) = await Bootstrap.initDomain();
});
setUp(() async {
await workerManagerPatch.init(dynamicSpawning: true);
server = await FakeImmichServer.start();
await ApiService().resolveAndSetEndpoint(server.endpoint);
await drift.delete(drift.userEntity).go();
await Store.delete(StoreKey.syncMigrationStatus);
});
tearDown(() async {
await workerManagerPatch.dispose();
await server.close();
await Store.delete(StoreKey.serverEndpoint);
await Store.delete(StoreKey.syncMigrationStatus);
});
void sendUser(SyncStream stream, String id, String name) {
stream.send(
type: SyncEntityType.userV1.value,
data: SyncUserV1(
id: id,
name: name,
email: '$id@test.com',
hasProfileImage: false,
deletedAt: null,
profileChangedAt: DateTime.utc(2025),
).toJson(),
ack: id,
);
}
Future<bool> dbReadable() async {
try {
await drift.customSelect('SELECT 1').get().timeout(const Duration(seconds: 5));
return true;
} catch (_) {
return false;
}
}
Future<int> userCount() async => (await drift.select(drift.userEntity).get()).length;
// Starts a remote sync and resolves once its /sync/stream request is open.
Future<(Future<bool>, SyncStream)> startSync() async {
final sync = BackgroundSyncManager().syncRemote();
final stream = await server.streamOpened.timeout(
const Duration(seconds: 30),
onTimeout: () => fail('sync isolate never opened /sync/stream'),
);
return (sync, stream);
}
testWidgets('a full sync ingests streamed events into the shared DB', (tester) async {
expect(await userCount(), 0);
final (sync, stream) = await startSync();
sendUser(stream, 'u1', 'Alice');
sendUser(stream, 'u2', 'Bob');
await stream.close();
final result = await sync.timeout(
const Duration(seconds: 30),
onTimeout: () => fail('sync did not complete after the stream ended'),
);
expect(result, isTrue);
expect(await userCount(), 2);
expect(server.ackRequests, greaterThan(0));
});
testWidgets('disposing the pool during an in-flight sync drains promptly', (tester) async {
final (sync, _) = await startSync();
final sw = Stopwatch()..start();
await workerManagerPatch.dispose().timeout(
const Duration(seconds: 15),
onTimeout: () => fail('dispose() hung — worker did not drain and exit'),
);
expect(sw.elapsed, lessThan(const Duration(seconds: 10)), reason: 'abort-driven, not socket-timeout bound');
expect(await sync.timeout(const Duration(seconds: 5), onTimeout: () => false), isFalse);
});
testWidgets('tearing down a worker blocked mid-write leaves the DB usable', (tester) async {
final (sync, stream) = await startSync();
// Hold an exclusive write transaction so the worker's write is blocked. The lock is taken only
// after the stream opens to avoid blocking the worker's own startup DB reads.
final releaseTxn = Completer<void>();
final txnHeld = Completer<void>();
final txn = drift.transaction(() async {
await drift.into(drift.userEntity).insert(
UserEntityCompanion.insert(
id: 'holder',
name: 'holder',
email: 'holder@test.com',
hasProfileImage: const Value(false),
profileChangedAt: Value(DateTime.utc(2025)),
),
);
txnHeld.complete();
await releaseTxn.future;
});
await txnHeld.future;
sendUser(stream, 'u1', 'Alice');
await stream.close();
// dispose() can only finish once the worker unwinds, which is blocked on the
// lock so start it, release the lock, then await completion.
final disposed = workerManagerPatch.dispose();
releaseTxn.complete();
await txn;
await disposed.timeout(
const Duration(seconds: 15),
onTimeout: () => fail('dispose() hung after releasing the write lock'),
);
await sync.timeout(const Duration(seconds: 5), onTimeout: () => false);
expect(await dbReadable(), isTrue);
final users = await drift.select(drift.userEntity).get();
expect(users.map((u) => u.id), contains('holder'));
});
}

View File

@ -0,0 +1,115 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';
/// A dummy localhost server that implements only the endpoints that remote-sync touches.
class FakeImmichServer {
FakeImmichServer._(this._server, this.version);
final HttpServer _server;
final (int, int, int) version;
final Completer<SyncStream> _streamOpened = Completer<SyncStream>();
int ackRequests = 0;
String get endpoint => 'http://${_server.address.host}:${_server.port}/api';
/// Resolves when the sync isolate opens `POST /sync/stream`.
Future<SyncStream> get streamOpened => _streamOpened.future;
static Future<FakeImmichServer> start({(int, int, int) version = (3, 0, 0)}) async {
final server = await HttpServer.bind(InternetAddress.loopbackIPv4, 0);
final fake = FakeImmichServer._(server, version);
fake._listen();
return fake;
}
void _listen() {
// A connection torn down mid-write during teardown is expected
_server.listen((request) => unawaited(_route(request).catchError((_) {})));
}
Future<void> _route(HttpRequest request) async {
final method = request.method;
final path = request.uri.path;
if (method == 'GET' && path == '/api/server/ping') {
return _respondJson(request, {'res': 'pong'});
}
if (method == 'GET' && path == '/api/server/version') {
final (major, minor, patch) = version;
return _respondJson(request, {'major': major, 'minor': minor, 'patch': patch});
}
if (path == '/api/sync/ack') {
if (method != 'DELETE') {
ackRequests++;
}
return _respondEmpty(request);
}
if (method == 'POST' && path == '/api/sync/stream') {
return _openSyncStream(request);
}
return _respondEmpty(request, status: HttpStatus.notFound);
}
Future<void> _openSyncStream(HttpRequest request) async {
await request.drain<void>();
request.response
..statusCode = HttpStatus.ok
..headers.contentType = ContentType('application', 'jsonlines+json')
..contentLength = -1 // chunked: stays open to stream incrementally
..bufferOutput = false;
// Flush headers so the client's send() resolves and enters its read loop.
await request.response.flush();
if (!_streamOpened.isCompleted) {
_streamOpened.complete(SyncStream._(request.response));
}
}
Future<void> _respondJson(HttpRequest request, Object body) async {
await request.drain<void>();
request.response
..statusCode = HttpStatus.ok
..headers.contentType = ContentType.json
..write(jsonEncode(body));
await request.response.close();
}
Future<void> _respondEmpty(HttpRequest request, {int status = HttpStatus.ok}) async {
await request.drain<void>();
request.response.statusCode = status;
await request.response.close();
}
Future<void> close() async {
if (_streamOpened.isCompleted) {
await (await _streamOpened.future).close();
}
await _server.close(force: true);
}
}
/// Handle to the open `/sync/stream` response: push jsonlines events, then end.
class SyncStream {
SyncStream._(this._response);
final HttpResponse _response;
bool _closed = false;
/// [data] should be a Sync*V1 DTO's `toJson()` so the parser's `fromJson` round-trips it.
void send({required String type, required Object data, required String ack}) {
if (_closed) {
return;
}
_response.write('${jsonEncode({'type': type, 'data': data, 'ack': ack})}\n');
}
Future<void> close() async {
if (_closed) {
return;
}
_closed = true;
await _response.close();
}
}

View File

@ -121,8 +121,8 @@ class BackgroundWorker: BackgroundWorkerBgHostApi {
/**
* Cancels the currently running background task, either due to timeout or external request.
* Sends a cancel signal to the Flutter side and sets up a fallback timer to ensure
* the completion handler is eventually called even if Flutter doesn't respond.
* Only tears down the engine after Dart confirms it's drained. If Dart overruns iOS's grace window,
* the expiration handler still calls setTaskCompleted and iOS suspends us.
*/
func close() {
if isComplete {
@ -132,12 +132,6 @@ class BackgroundWorker: BackgroundWorkerBgHostApi {
flutterApi?.cancel { result in
self.complete(success: false)
}
// Fallback safety mechanism: ensure completion is called within 2 seconds
// This prevents the background task from hanging indefinitely if Flutter doesn't respond
Timer.scheduledTimer(withTimeInterval: 2, repeats: false) { _ in
self.complete(success: false)
}
}

View File

@ -526,16 +526,17 @@ class MessagesPigeonCodec: FlutterStandardMessageCodec, @unchecked Sendable {
/// Generated protocol from Pigeon that represents a handler of messages from Flutter.
protocol NativeSyncApi {
func shouldFullSync() throws -> Bool
func getMediaChanges() throws -> SyncDelta
func shouldFullSync(completion: @escaping (Result<Bool, Error>) -> Void)
func getMediaChanges(completion: @escaping (Result<SyncDelta, Error>) -> Void)
func checkpointSync() throws
func clearSyncCheckpoint() throws
func getAssetIdsForAlbum(albumId: String) throws -> [String]
func getAlbums() throws -> [PlatformAlbum]
func getAssetIdsForAlbum(albumId: String, completion: @escaping (Result<[String], Error>) -> Void)
func getAlbums(completion: @escaping (Result<[PlatformAlbum], Error>) -> Void)
func getAssetsCountSince(albumId: String, timestamp: Int64) throws -> Int64
func getAssetsForAlbum(albumId: String, updatedTimeCond: Int64?) throws -> [PlatformAsset]
func getAssetsForAlbum(albumId: String, updatedTimeCond: Int64?, completion: @escaping (Result<[PlatformAsset], Error>) -> Void)
func hashAssets(assetIds: [String], allowNetworkAccess: Bool, completion: @escaping (Result<[HashResult], Error>) -> Void)
func cancelHashing() throws
func cancelSync() throws
func getTrashedAssets() throws -> [String: [PlatformAsset]]
func restoreFromTrashById(mediaId: String, type: Int64, completion: @escaping (Result<Bool, Error>) -> Void)
func getCloudIdForAssetIds(assetIds: [String]) throws -> [CloudIdResult]
@ -555,26 +556,28 @@ class NativeSyncApiSetup {
let shouldFullSyncChannel = FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.shouldFullSync\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec)
if let api = api {
shouldFullSyncChannel.setMessageHandler { _, reply in
do {
let result = try api.shouldFullSync()
reply(wrapResult(result))
} catch {
reply(wrapError(error))
api.shouldFullSync { result in
switch result {
case .success(let res):
reply(wrapResult(res))
case .failure(let error):
reply(wrapError(error))
}
}
}
} else {
shouldFullSyncChannel.setMessageHandler(nil)
}
let getMediaChangesChannel = taskQueue == nil
? FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getMediaChanges\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec)
: FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getMediaChanges\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec, taskQueue: taskQueue)
let getMediaChangesChannel = FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getMediaChanges\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec)
if let api = api {
getMediaChangesChannel.setMessageHandler { _, reply in
do {
let result = try api.getMediaChanges()
reply(wrapResult(result))
} catch {
reply(wrapError(error))
api.getMediaChanges { result in
switch result {
case .success(let res):
reply(wrapResult(res))
case .failure(let error):
reply(wrapError(error))
}
}
}
} else {
@ -606,33 +609,33 @@ class NativeSyncApiSetup {
} else {
clearSyncCheckpointChannel.setMessageHandler(nil)
}
let getAssetIdsForAlbumChannel = taskQueue == nil
? FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetIdsForAlbum\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec)
: FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetIdsForAlbum\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec, taskQueue: taskQueue)
let getAssetIdsForAlbumChannel = FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetIdsForAlbum\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec)
if let api = api {
getAssetIdsForAlbumChannel.setMessageHandler { message, reply in
let args = message as! [Any?]
let albumIdArg = args[0] as! String
do {
let result = try api.getAssetIdsForAlbum(albumId: albumIdArg)
reply(wrapResult(result))
} catch {
reply(wrapError(error))
api.getAssetIdsForAlbum(albumId: albumIdArg) { result in
switch result {
case .success(let res):
reply(wrapResult(res))
case .failure(let error):
reply(wrapError(error))
}
}
}
} else {
getAssetIdsForAlbumChannel.setMessageHandler(nil)
}
let getAlbumsChannel = taskQueue == nil
? FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAlbums\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec)
: FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAlbums\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec, taskQueue: taskQueue)
let getAlbumsChannel = FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAlbums\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec)
if let api = api {
getAlbumsChannel.setMessageHandler { _, reply in
do {
let result = try api.getAlbums()
reply(wrapResult(result))
} catch {
reply(wrapError(error))
api.getAlbums { result in
switch result {
case .success(let res):
reply(wrapResult(res))
case .failure(let error):
reply(wrapError(error))
}
}
}
} else {
@ -656,19 +659,19 @@ class NativeSyncApiSetup {
} else {
getAssetsCountSinceChannel.setMessageHandler(nil)
}
let getAssetsForAlbumChannel = taskQueue == nil
? FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetsForAlbum\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec)
: FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetsForAlbum\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec, taskQueue: taskQueue)
let getAssetsForAlbumChannel = FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetsForAlbum\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec)
if let api = api {
getAssetsForAlbumChannel.setMessageHandler { message, reply in
let args = message as! [Any?]
let albumIdArg = args[0] as! String
let updatedTimeCondArg: Int64? = nilOrValue(args[1])
do {
let result = try api.getAssetsForAlbum(albumId: albumIdArg, updatedTimeCond: updatedTimeCondArg)
reply(wrapResult(result))
} catch {
reply(wrapError(error))
api.getAssetsForAlbum(albumId: albumIdArg, updatedTimeCond: updatedTimeCondArg) { result in
switch result {
case .success(let res):
reply(wrapResult(res))
case .failure(let error):
reply(wrapError(error))
}
}
}
} else {
@ -707,6 +710,19 @@ class NativeSyncApiSetup {
} else {
cancelHashingChannel.setMessageHandler(nil)
}
let cancelSyncChannel = FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.cancelSync\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec)
if let api = api {
cancelSyncChannel.setMessageHandler { _, reply in
do {
try api.cancelSync()
reply(wrapResult(nil))
} catch {
reply(wrapError(error))
}
}
} else {
cancelSyncChannel.setMessageHandler(nil)
}
let getTrashedAssetsChannel = taskQueue == nil
? FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getTrashedAssets\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec)
: FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getTrashedAssets\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec, taskQueue: taskQueue)

View File

@ -39,6 +39,9 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin {
private static let hashCancelledCode = "HASH_CANCELLED"
private static let hashCancelled = Result<[HashResult], Error>.failure(PigeonError(code: hashCancelledCode, message: "Hashing cancelled", details: nil))
private var syncTask: Task<Void?, Error>?
private static let syncCancelledCode = "SYNC_CANCELLED"
private static let syncCancelled = PigeonError(code: syncCancelledCode, message: "Sync cancelled", details: nil)
init(with defaults: UserDefaults = .standard) {
self.defaults = defaults
@ -71,7 +74,11 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin {
saveChangeToken(token: PHPhotoLibrary.shared().currentChangeToken)
}
func shouldFullSync() -> Bool {
func shouldFullSync(completion: @escaping (Result<Bool, Error>) -> Void) {
runSync(completion) { $0.shouldFullSync() }
}
private func shouldFullSync() -> Bool {
guard #available(iOS 16, *),
PHPhotoLibrary.authorizationStatus(for: .readWrite) == .authorized,
let storedToken = getChangeToken() else {
@ -87,12 +94,17 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin {
return false
}
func getAlbums() throws -> [PlatformAlbum] {
func getAlbums(completion: @escaping (Result<[PlatformAlbum], Error>) -> Void) {
runSync(completion) { try $0.getAlbums() }
}
private func getAlbums() throws -> [PlatformAlbum] {
var albums: [PlatformAlbum] = []
albumTypes.forEach { type in
for type in albumTypes {
let collections = PHAssetCollection.fetchAssetCollections(with: type, subtype: .any, options: nil)
for i in 0..<collections.count {
try Task.checkCancellation()
let album = collections.object(at: i)
// Ignore recovered album
@ -126,7 +138,11 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin {
return albums.sorted { $0.id < $1.id }
}
func getMediaChanges() throws -> SyncDelta {
func getMediaChanges(completion: @escaping (Result<SyncDelta, Error>) -> Void) {
runSync(completion) { try $0.getMediaChanges() }
}
private func getMediaChanges() throws -> SyncDelta {
guard #available(iOS 16, *) else {
throw PigeonError(code: "UNSUPPORTED_OS", message: "This feature requires iOS 16 or later.", details: nil)
}
@ -146,51 +162,49 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin {
return SyncDelta(hasChanges: false, updates: [], deletes: [], assetAlbums: [:])
}
do {
let changes = try PHPhotoLibrary.shared().fetchPersistentChanges(since: storedToken)
let changes = try PHPhotoLibrary.shared().fetchPersistentChanges(since: storedToken)
var updatedAssets: Set<AssetWrapper> = []
var deletedAssets: Set<String> = []
for change in changes {
try Task.checkCancellation()
guard let details = try? change.changeDetails(for: PHObjectType.asset) else { continue }
var updatedAssets: Set<AssetWrapper> = []
var deletedAssets: Set<String> = []
let updated = details.updatedLocalIdentifiers.union(details.insertedLocalIdentifiers)
deletedAssets.formUnion(details.deletedLocalIdentifiers)
for change in changes {
guard let details = try? change.changeDetails(for: PHObjectType.asset) else { continue }
if (updated.isEmpty) { continue }
let options = PHFetchOptions()
options.includeHiddenAssets = false
let result = PHAsset.fetchAssets(withLocalIdentifiers: Array(updated), options: options)
for i in 0..<result.count {
let asset = result.object(at: i)
let updated = details.updatedLocalIdentifiers.union(details.insertedLocalIdentifiers)
deletedAssets.formUnion(details.deletedLocalIdentifiers)
if (updated.isEmpty) { continue }
let options = PHFetchOptions()
options.includeHiddenAssets = false
let result = PHAsset.fetchAssets(withLocalIdentifiers: Array(updated), options: options)
for i in 0..<result.count {
let asset = result.object(at: i)
// Asset wrapper only uses the id for comparison. Multiple change can contain the same asset, skip duplicate changes
let predicate = PlatformAsset(
id: asset.localIdentifier,
name: "",
type: 0,
durationMs: 0,
orientation: 0,
isFavorite: false,
playbackStyle: .unknown
)
if (updatedAssets.contains(AssetWrapper(with: predicate))) {
continue
}
let domainAsset = AssetWrapper(with: asset.toPlatformAsset())
updatedAssets.insert(domainAsset)
// Asset wrapper only uses the id for comparison. Multiple change can contain the same asset, skip duplicate changes
let predicate = PlatformAsset(
id: asset.localIdentifier,
name: "",
type: 0,
durationMs: 0,
orientation: 0,
isFavorite: false,
playbackStyle: .unknown
)
if (updatedAssets.contains(AssetWrapper(with: predicate))) {
continue
}
let domainAsset = AssetWrapper(with: asset.toPlatformAsset())
updatedAssets.insert(domainAsset)
}
let updates = Array(updatedAssets.map { $0.asset })
return SyncDelta(hasChanges: true, updates: updates, deletes: Array(deletedAssets), assetAlbums: buildAssetAlbumsMap(assets: updates))
}
let updates = Array(updatedAssets.map { $0.asset })
return SyncDelta(hasChanges: true, updates: updates, deletes: Array(deletedAssets), assetAlbums: buildAssetAlbumsMap(assets: updates))
}
private func buildAssetAlbumsMap(assets: Array<PlatformAsset>) -> [String: [String]] {
guard !assets.isEmpty else {
return [:]
@ -213,7 +227,11 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin {
return albumAssets
}
func getAssetIdsForAlbum(albumId: String) throws -> [String] {
func getAssetIdsForAlbum(albumId: String, completion: @escaping (Result<[String], Error>) -> Void) {
runSync(completion) { try $0.getAssetIdsForAlbum(albumId: albumId) }
}
private func getAssetIdsForAlbum(albumId: String) throws -> [String] {
let collections = PHAssetCollection.fetchAssetCollections(withLocalIdentifiers: [albumId], options: nil)
guard let album = collections.firstObject else {
return []
@ -223,9 +241,14 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin {
let options = PHFetchOptions()
options.includeHiddenAssets = false
let assets = getAssetsFromAlbum(in: album, options: options)
assets.enumerateObjects { (asset, _, _) in
assets.enumerateObjects { (asset, _, stop) in
if Task.isCancelled {
stop.pointee = true
return
}
ids.append(asset.localIdentifier)
}
try Task.checkCancellation()
return ids
}
@ -243,7 +266,11 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin {
return Int64(assets.count)
}
func getAssetsForAlbum(albumId: String, updatedTimeCond: Int64?) throws -> [PlatformAsset] {
func getAssetsForAlbum(albumId: String, updatedTimeCond: Int64?, completion: @escaping (Result<[PlatformAsset], Error>) -> Void) {
runSync(completion) { try $0.getAssetsForAlbum(albumId: albumId, updatedTimeCond: updatedTimeCond) }
}
private func getAssetsForAlbum(albumId: String, updatedTimeCond: Int64?) throws -> [PlatformAsset] {
let collections = PHAssetCollection.fetchAssetCollections(withLocalIdentifiers: [albumId], options: nil)
guard let album = collections.firstObject else {
return []
@ -262,9 +289,14 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin {
}
var assets: [PlatformAsset] = []
result.enumerateObjects { (asset, _, _) in
result.enumerateObjects { (asset, _, stop) in
if Task.isCancelled {
stop.pointee = true
return
}
assets.append(asset.toPlatformAsset())
}
try Task.checkCancellation()
return assets
}
@ -324,6 +356,31 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin {
hashTask = nil
}
func cancelSync() {
syncTask?.cancel()
syncTask = nil
}
private func runSync<T>(
_ completion: @escaping (Result<T, Error>) -> Void,
_ work: @escaping (NativeSyncApiImpl) throws -> T
) {
syncTask?.cancel()
syncTask = Task { [weak self] in
guard let self else { return nil }
let result: Result<T, Error>
do {
result = .success(try work(self))
} catch is CancellationError {
result = .failure(Self.syncCancelled)
} catch {
result = .failure(error)
}
self.completeWhenActive(for: completion, with: result)
return nil
}
}
private func hashAsset(_ asset: PHAsset, allowNetworkAccess: Bool) async -> HashResult? {
class RequestRef {
var id: PHAssetResourceDataRequestID?

View File

@ -188,20 +188,14 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
if (!_cancellationToken.isCompleted) {
_cancellationToken.complete();
}
final cleanupFutures = [
nativeSyncApi?.cancelHashing(),
workerManagerPatch.dispose().catchError((_) async {
// Discard any errors on the dispose call
return;
}),
LogService.I.dispose(),
Store.dispose(),
backgroundSyncManager?.cancel(),
_drift.optimize(allTables: true),
];
await Future.wait(cleanupFutures.nonNulls);
// Workers share one sqlite connection, so DB teardown must wait until every worker has stopped using it.
await Future.wait([
if (backgroundSyncManager != null) backgroundSyncManager.cancel(),
if (nativeSyncApi != null) nativeSyncApi.cancelHashing(),
]);
await workerManagerPatch.dispose().catchError((_) async {});
await Future.wait([LogService.I.dispose(), Store.dispose(), _drift.optimize(allTables: true)]);
await _drift.close();
await _driftLogger.close();

View File

@ -1,3 +1,5 @@
import 'dart:async';
import 'package:flutter/services.dart';
import 'package:immich_mobile/constants/constants.dart';
import 'package:immich_mobile/domain/models/album/local_album.model.dart';
@ -17,7 +19,7 @@ class HashService {
final DriftLocalAssetRepository _localAssetRepository;
final DriftTrashedLocalAssetRepository _trashedLocalAssetRepository;
final NativeSyncApi _nativeSyncApi;
final bool Function()? _cancelChecker;
final Completer<void>? _cancellation;
final _log = Logger('HashService');
HashService({
@ -25,11 +27,15 @@ class HashService {
required this._localAssetRepository,
required this._trashedLocalAssetRepository,
required this._nativeSyncApi,
this._cancelChecker,
this._cancellation,
int? batchSize,
}) : _batchSize = batchSize ?? kBatchHashFileLimit;
}) : _batchSize = batchSize ?? kBatchHashFileLimit {
// Stop the in-flight native hash call promptly on cancellation; the loops
// below also observe [isCancelled] to bail between batches.
_cancellation?.future.then((_) => _nativeSyncApi.cancelHashing().onError(_log.warning));
}
bool get isCancelled => _cancelChecker?.call() ?? false;
bool get isCancelled => _cancellation?.isCompleted ?? false;
Future<void> hashAssets() async {
_log.info("Starting hashing of assets");

View File

@ -2,6 +2,7 @@ import 'dart:async';
import 'package:collection/collection.dart';
import 'package:flutter/foundation.dart';
import 'package:flutter/services.dart';
import 'package:immich_mobile/domain/models/album/local_album.model.dart';
import 'package:immich_mobile/domain/models/asset/base_asset.model.dart';
import 'package:immich_mobile/domain/models/store.model.dart';
@ -17,6 +18,8 @@ import 'package:immich_mobile/utils/datetime_helpers.dart';
import 'package:immich_mobile/utils/diff.dart';
import 'package:logging/logging.dart';
const String _kSyncCancelledCode = "SYNC_CANCELLED";
class LocalSyncService {
final DriftLocalAlbumRepository _localAlbumRepository;
// ignore: unused_field
@ -25,6 +28,7 @@ class LocalSyncService {
final DriftTrashedLocalAssetRepository _trashedLocalAssetRepository;
final AssetMediaRepository _assetMediaRepository;
final IPermissionRepository _permissionRepository;
final Completer<void>? _cancellation;
final Logger _log = Logger("DeviceSyncService");
LocalSyncService({
@ -34,7 +38,12 @@ class LocalSyncService {
required this._trashedLocalAssetRepository,
required this._assetMediaRepository,
required this._permissionRepository,
});
this._cancellation,
}) {
_cancellation?.future.then((_) => _nativeSyncApi.cancelSync().onError(_log.warning));
}
bool get _isCancelled => _cancellation?.isCompleted ?? false;
Future<void> sync({bool full = false}) async {
final Stopwatch stopwatch = Stopwatch()..start();
@ -81,6 +90,10 @@ class LocalSyncService {
// detect album deletions from the native side
if (CurrentPlatform.isAndroid) {
for (final album in dbAlbums) {
if (_isCancelled) {
_log.warning("Local sync cancelled. Stopped processing albums.");
return;
}
final deviceIds = await _nativeSyncApi.getAssetIdsForAlbum(album.id);
await _localAlbumRepository.syncDeletes(album.id, deviceIds);
}
@ -91,6 +104,10 @@ class LocalSyncService {
// does not include changes for cloud albums.
final cloudAlbums = deviceAlbums.where((a) => a.isCloud).toLocalAlbums();
for (final album in cloudAlbums) {
if (_isCancelled) {
_log.warning("Local sync cancelled. Stopped processing cloud albums.");
return;
}
final dbAlbum = dbAlbums.firstWhereOrNull((a) => a.id == album.id);
if (dbAlbum == null) {
_log.warning("Cloud album ${album.name} not found in local database. Skipping sync.");
@ -102,6 +119,12 @@ class LocalSyncService {
await _mapIosCloudIds(newAssets);
}
await _nativeSyncApi.checkpointSync();
} on PlatformException catch (e, s) {
if (e.code == _kSyncCancelledCode) {
_log.warning("Local sync cancelled");
} else {
_log.severe("Error performing device sync", e, s);
}
} catch (e, s) {
_log.severe("Error performing device sync", e, s);
} finally {
@ -129,12 +152,21 @@ class LocalSyncService {
await _nativeSyncApi.checkpointSync();
stopwatch.stop();
_log.info("Full device sync took - ${stopwatch.elapsedMilliseconds}ms");
} on PlatformException catch (e, s) {
if (e.code == _kSyncCancelledCode) {
_log.warning("Full device sync cancelled");
} else {
_log.severe("Error performing full device sync", e, s);
}
} catch (e, s) {
_log.severe("Error performing full device sync", e, s);
}
}
Future<void> addAlbum(LocalAlbum album) async {
if (_isCancelled) {
return;
}
try {
_log.fine("Adding device album ${album.name}");
@ -162,6 +194,9 @@ class LocalSyncService {
// The deviceAlbum is ignored since we are going to refresh it anyways
FutureOr<bool> updateAlbum(LocalAlbum dbAlbum, LocalAlbum deviceAlbum) async {
if (_isCancelled) {
return false;
}
try {
_log.fine("Syncing device album ${dbAlbum.name}");

View File

@ -112,10 +112,16 @@ class LogService {
return _flushBuffer();
}
Future<void> dispose() {
Future<void> dispose() async {
_flushTimer?.cancel();
_logSubscription.cancel();
return _flushBuffer();
_flushTimer = null;
await _logSubscription.cancel();
await _flushBuffer();
// Allow a subsequent init() (e.g. when a worker isolate is reused) to
// create a fresh instance instead of returning this disposed one.
if (identical(_instance, this)) {
_instance = null;
}
}
Future<void> _flushBuffer() async {

View File

@ -54,7 +54,13 @@ class StoreService {
/// Disposes the store and cancels the subscription. To reuse the store call init() again
Future<void> dispose() async {
await _storeUpdateSubscription?.cancel();
_storeUpdateSubscription = null;
_cache.clear();
// Allow a subsequent init() (e.g. when a worker isolate is reused) to
// create a fresh instance instead of returning this disposed one.
if (identical(_instance, this)) {
_instance = null;
}
}
/// Returns the cached value for [key], or `null`

View File

@ -1,3 +1,5 @@
import 'dart:async';
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/domain/models/album/local_album.model.dart';
import 'package:immich_mobile/domain/models/store.model.dart';
@ -5,6 +7,7 @@ import 'package:immich_mobile/domain/services/store.service.dart';
import 'package:immich_mobile/infrastructure/repositories/local_album.repository.dart';
import 'package:immich_mobile/infrastructure/repositories/remote_album.repository.dart';
import 'package:immich_mobile/providers/infrastructure/album.provider.dart';
import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart';
import 'package:immich_mobile/providers/infrastructure/store.provider.dart';
import 'package:immich_mobile/repositories/drift_album_api_repository.dart';
import 'package:immich_mobile/utils/debug_print.dart';
@ -16,6 +19,7 @@ final syncLinkedAlbumServiceProvider = Provider(
ref.watch(remoteAlbumRepository),
ref.watch(driftAlbumApiRepositoryProvider),
ref.watch(storeServiceProvider),
cancellation: ref.watch(cancellationProvider),
),
);
@ -24,13 +28,15 @@ class SyncLinkedAlbumService {
final DriftRemoteAlbumRepository _remoteAlbumRepository;
final DriftAlbumApiRepository _albumApiRepository;
final StoreService _storeService;
final Completer<void>? _cancellation;
SyncLinkedAlbumService(
this._localAlbumRepository,
this._remoteAlbumRepository,
this._albumApiRepository,
this._storeService,
);
this._storeService, {
this._cancellation,
});
final _log = Logger("SyncLinkedAlbumService");
@ -55,7 +61,11 @@ class SyncLinkedAlbumService {
final assetIds = await _remoteAlbumRepository.getLinkedAssetIds(userId, localAlbum.id, linkedRemoteAlbumId);
_log.fine("Syncing ${assetIds.length} assets to remote album: ${remoteAlbum.name}");
if (assetIds.isNotEmpty) {
final album = await _albumApiRepository.addAssets(remoteAlbum.id, assetIds);
final album = await _albumApiRepository.addAssets(
remoteAlbum.id,
assetIds,
abortTrigger: _cancellation?.future,
);
await _remoteAlbumRepository.addAssets(remoteAlbum.id, album.added);
}
}),

View File

@ -38,7 +38,7 @@ class SyncStreamService {
final IPermissionRepository _permissionRepository;
final SyncMigrationRepository _syncMigrationRepository;
final ApiService _api;
final bool Function()? _cancelChecker;
final Completer<void>? _cancellation;
SyncStreamService({
required this._syncApiRepository,
@ -49,10 +49,10 @@ class SyncStreamService {
required this._permissionRepository,
required this._syncMigrationRepository,
required this._api,
this._cancelChecker,
this._cancellation,
});
bool get isCancelled => _cancelChecker?.call() ?? false;
bool get isCancelled => _cancellation?.isCompleted ?? false;
Future<bool> sync() async {
_logger.info("Remote sync request for user");
@ -80,10 +80,15 @@ class SyncStreamService {
_handleEvents,
serverVersion: serverSemVer,
onReset: () => shouldReset = true,
abortSignal: _cancellation?.future,
);
if (shouldReset) {
_logger.info("Resetting sync state as requested by server");
await _syncApiRepository.streamChanges(_handleEvents, serverVersion: serverSemVer);
await _syncApiRepository.streamChanges(
_handleEvents,
serverVersion: serverSemVer,
abortSignal: _cancellation?.future,
);
}
previousLength = migrations.length;
@ -318,7 +323,7 @@ class SyncStreamService {
}
Future<void> handleWsAssetUploadReadyV1Batch(List<dynamic> batchData) async {
if (batchData.isEmpty) {
if (batchData.isEmpty || isCancelled) {
return;
}
@ -361,7 +366,7 @@ class SyncStreamService {
}
Future<void> handleWsAssetUploadReadyV2Batch(List<dynamic> batchData) async {
if (batchData.isEmpty) {
if (batchData.isEmpty || isCancelled) {
return;
}
@ -404,6 +409,9 @@ class SyncStreamService {
}
Future<void> handleWsAssetEditReadyV1(dynamic data) async {
if (isCancelled) {
return;
}
_logger.info('Processing AssetEditReadyV1 event');
try {
@ -444,6 +452,9 @@ class SyncStreamService {
}
Future<void> handleWsAssetEditReadyV2(dynamic data) async {
if (isCancelled) {
return;
}
_logger.info('Processing AssetEditReadyV2 event');
try {

View File

@ -50,53 +50,27 @@ class BackgroundSyncManager {
});
Future<void> cancel() async {
final futures = <Future>[];
if (_syncTask != null) {
futures.add(_syncTask!.future);
final tasks = [
_syncTask,
_syncWebsocketTask,
_cloudIdSyncTask,
_linkedAlbumSyncTask,
_deviceAlbumSyncTask,
_hashTask,
];
final futures = [
for (final task in tasks)
if (task != null) task.future,
];
for (final task in tasks) {
task?.cancel();
}
_syncTask?.cancel();
_syncTask = null;
if (_syncWebsocketTask != null) {
futures.add(_syncWebsocketTask!.future);
}
_syncWebsocketTask?.cancel();
_syncWebsocketTask = null;
if (_cloudIdSyncTask != null) {
futures.add(_cloudIdSyncTask!.future);
}
_cloudIdSyncTask?.cancel();
_cloudIdSyncTask = null;
if (_linkedAlbumSyncTask != null) {
futures.add(_linkedAlbumSyncTask!.future);
}
_linkedAlbumSyncTask?.cancel();
_linkedAlbumSyncTask = null;
try {
await Future.wait(futures);
} on CanceledError {
// Ignore cancellation errors
}
}
Future<void> cancelLocal() async {
final futures = <Future>[];
if (_hashTask != null) {
futures.add(_hashTask!.future);
}
_hashTask?.cancel();
_hashTask = null;
if (_deviceAlbumSyncTask != null) {
futures.add(_deviceAlbumSyncTask!.future);
}
_deviceAlbumSyncTask?.cancel();
_deviceAlbumSyncTask = null;
_hashTask = null;
try {
await Future.wait(futures);

View File

@ -1,3 +1,5 @@
import 'dart:async';
import 'package:drift/drift.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/constants/constants.dart';
@ -9,6 +11,7 @@ import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
import 'package:immich_mobile/infrastructure/repositories/local_album.repository.dart';
import 'package:immich_mobile/platform/native_sync_api.g.dart';
import 'package:immich_mobile/providers/api.provider.dart';
import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart';
import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
import 'package:immich_mobile/providers/infrastructure/sync.provider.dart';
import 'package:immich_mobile/providers/server_info.provider.dart';
@ -51,9 +54,10 @@ Future<void> syncCloudIds(ProviderContainer ref) async {
}
final assetApi = ref.read(apiServiceProvider).assetsApi;
final cancellation = ref.read(cancellationProvider);
// Process cloud IDs in paginated batches
await _processCloudIdMappingsInBatches(db, currentUser.id, assetApi, canBulkUpdateMetadata, logger);
await _processCloudIdMappingsInBatches(db, currentUser.id, assetApi, canBulkUpdateMetadata, logger, cancellation);
}
Future<void> _processCloudIdMappingsInBatches(
@ -62,12 +66,17 @@ Future<void> _processCloudIdMappingsInBatches(
AssetsApi assetsApi,
bool canBulkUpdate,
Logger logger,
Completer<void> cancellation,
) async {
const pageSize = 20000;
String? lastLocalId;
final seenRemoteAssetIds = <String>{};
while (true) {
if (cancellation.isCompleted) {
logger.warning('Cloud ID migration cancelled. Stopping batch processing.');
break;
}
final mappings = await _fetchCloudIdMappings(drift, userId, pageSize, lastLocalId);
if (mappings.isEmpty) {
break;
@ -98,9 +107,9 @@ Future<void> _processCloudIdMappingsInBatches(
if (items.isNotEmpty) {
if (canBulkUpdate) {
await _bulkUpdateCloudIds(assetsApi, items);
await _bulkUpdateCloudIds(assetsApi, items, cancellation.future);
} else {
await _sequentialUpdateCloudIds(assetsApi, items);
await _sequentialUpdateCloudIds(assetsApi, items, cancellation);
}
}
@ -111,20 +120,35 @@ Future<void> _processCloudIdMappingsInBatches(
}
}
Future<void> _sequentialUpdateCloudIds(AssetsApi assetsApi, List<AssetMetadataBulkUpsertItemDto> items) async {
Future<void> _sequentialUpdateCloudIds(
AssetsApi assetsApi,
List<AssetMetadataBulkUpsertItemDto> items,
Completer<void> cancellation,
) async {
for (final item in items) {
if (cancellation.isCompleted) {
break;
}
final upsertItem = AssetMetadataUpsertItemDto(key: item.key, value: item.value);
try {
await assetsApi.updateAssetMetadata(item.assetId, AssetMetadataUpsertDto(items: [upsertItem]));
await assetsApi.updateAssetMetadata(
item.assetId,
AssetMetadataUpsertDto(items: [upsertItem]),
abortTrigger: cancellation.future,
);
} catch (error, stack) {
Logger('migrateCloudIds').warning('Failed to update metadata for asset ${item.assetId}', error, stack);
}
}
}
Future<void> _bulkUpdateCloudIds(AssetsApi assetsApi, List<AssetMetadataBulkUpsertItemDto> items) async {
Future<void> _bulkUpdateCloudIds(
AssetsApi assetsApi,
List<AssetMetadataBulkUpsertItemDto> items,
Future<void> abortTrigger,
) async {
try {
await assetsApi.updateBulkAssetMetadata(AssetMetadataBulkUpsertDto(items: items));
await assetsApi.updateBulkAssetMetadata(AssetMetadataBulkUpsertDto(items: items), abortTrigger: abortTrigger);
} catch (error, stack) {
Logger('migrateCloudIds').warning('Failed to bulk update metadata', error, stack);
}

View File

@ -29,6 +29,7 @@ class SyncApiRepository {
Function()? onReset,
int batchSize = kSyncEventBatchSize,
http.Client? httpClient,
Future<void>? abortSignal,
}) async {
final stopwatch = Stopwatch()..start();
final client = httpClient ?? NetworkRepository.client;
@ -36,7 +37,7 @@ class SyncApiRepository {
final headers = {'Content-Type': 'application/json', 'Accept': 'application/jsonlines+json'};
final request = http.Request('POST', Uri.parse(endpoint));
final request = http.AbortableRequest('POST', Uri.parse(endpoint), abortTrigger: abortSignal);
request.headers.addAll(headers);
request.body = jsonEncode(
SyncStreamDto(

View File

@ -635,6 +635,20 @@ class NativeSyncApi {
_extractReplyValueOrThrow(pigeonVar_replyList, pigeonVar_channelName, isNullValid: true);
}
Future<void> cancelSync() async {
final pigeonVar_channelName =
'dev.flutter.pigeon.immich_mobile.NativeSyncApi.cancelSync$pigeonVar_messageChannelSuffix';
final pigeonVar_channel = BasicMessageChannel<Object?>(
pigeonVar_channelName,
pigeonChannelCodec,
binaryMessenger: pigeonVar_binaryMessenger,
);
final Future<Object?> pigeonVar_sendFuture = pigeonVar_channel.send(null);
final pigeonVar_replyList = await pigeonVar_sendFuture as List<Object?>?;
_extractReplyValueOrThrow(pigeonVar_replyList, pigeonVar_channelName, isNullValid: true);
}
Future<Map<String, List<PlatformAsset>>> getTrashedAssets() async {
final pigeonVar_channelName =
'dev.flutter.pigeon.immich_mobile.NativeSyncApi.getTrashedAssets$pigeonVar_messageChannelSuffix';

View File

@ -1,8 +1,9 @@
import 'dart:async';
import 'package:hooks_riverpod/hooks_riverpod.dart';
/// Provider holding a boolean function that returns true when cancellation is requested.
/// A computation running in the isolate uses the function to implement cooperative cancellation.
final cancellationProvider = Provider<bool Function()>(
/// Holds the isolate's cancellation signal.
final cancellationProvider = Provider<Completer<void>>(
// This will be overridden in the isolate's container.
// Throwing ensures it's not used without an override.
(ref) => throw UnimplementedError(

View File

@ -26,7 +26,7 @@ final syncStreamServiceProvider = Provider(
permissionRepository: ref.watch(permissionRepositoryProvider),
syncMigrationRepository: ref.watch(syncMigrationRepositoryProvider),
api: ref.watch(apiServiceProvider),
cancelChecker: ref.watch(cancellationProvider),
cancellation: ref.watch(cancellationProvider),
),
);
@ -42,6 +42,7 @@ final localSyncServiceProvider = Provider(
assetMediaRepository: ref.watch(assetMediaRepositoryProvider),
permissionRepository: ref.watch(permissionRepositoryProvider),
nativeSyncApi: ref.watch(nativeSyncApiProvider),
cancellation: ref.watch(cancellationProvider),
),
);
@ -51,5 +52,6 @@ final hashServiceProvider = Provider(
localAssetRepository: ref.watch(localAssetRepository),
nativeSyncApi: ref.watch(nativeSyncApiProvider),
trashedLocalAssetRepository: ref.watch(trashedLocalAssetRepository),
cancellation: ref.watch(cancellationProvider),
),
);

View File

@ -47,8 +47,14 @@ class DriftAlbumApiRepository extends ApiRepository {
return (removed: removed, failed: failed);
}
Future<({List<String> added, List<String> failed})> addAssets(String albumId, Iterable<String> assetIds) async {
final response = await checkNull(_api.addAssetsToAlbum(albumId, BulkIdsDto(ids: assetIds.toList())));
Future<({List<String> added, List<String> failed})> addAssets(
String albumId,
Iterable<String> assetIds, {
Future<void>? abortTrigger,
}) async {
final response = await checkNull(
_api.addAssetsToAlbum(albumId, BulkIdsDto(ids: assetIds.toList()), abortTrigger: abortTrigger),
);
final List<String> added = [], failed = [];
for (final dto in response) {
if (dto.success) {

View File

@ -8,10 +8,9 @@ import 'package:immich_mobile/entities/store.entity.dart';
import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart';
import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
import 'package:immich_mobile/utils/bootstrap.dart';
import 'package:immich_mobile/utils/debug_print.dart';
import 'package:immich_mobile/wm_executor.dart';
import 'package:logging/logging.dart';
import 'package:worker_manager/worker_manager.dart';
import 'package:worker_manager/worker_manager.dart' show Cancelable;
class InvalidIsolateUsageException implements Exception {
const InvalidIsolateUsageException();
@ -30,50 +29,27 @@ Cancelable<T?> runInIsolateGentle<T>({
throw const InvalidIsolateUsageException();
}
return workerManagerPatch.executeGentle((cancelledChecker) async {
T? result;
await runZonedGuarded(
() async {
BackgroundIsolateBinaryMessenger.ensureInitialized(token);
DartPluginRegistrant.ensureInitialized();
return workerManagerPatch.executeGentle((onCancel) async {
BackgroundIsolateBinaryMessenger.ensureInitialized(token);
DartPluginRegistrant.ensureInitialized();
final (drift, logDb) = await Bootstrap.initDomain(shouldBufferLogs: false, listenStoreUpdates: false);
final ref = ProviderContainer(
overrides: [
cancellationProvider.overrideWithValue(cancelledChecker),
driftProvider.overrideWith(driftOverride(drift)),
],
);
Logger log = Logger("IsolateLogger");
try {
result = await computation(ref);
} on CanceledError {
log.warning("Computation cancelled ${debugLabel == null ? '' : ' for $debugLabel'}");
} catch (error, stack) {
log.severe("Error in runInIsolateGentle ${debugLabel == null ? '' : ' for $debugLabel'}", error, stack);
} finally {
try {
ref.dispose();
await Store.dispose();
await LogService.I.dispose();
await logDb.close();
await drift.close();
} catch (error, stack) {
dPrint(() => "Error closing resources in isolate: $error, $stack");
} finally {
ref.dispose();
// Delay to ensure all resources are released
await Future.delayed(const Duration(seconds: 2));
}
}
},
(error, stack) {
dPrint(() => "Error in isolate $debugLabel zone: $error, $stack");
},
final log = Logger("IsolateLogger");
final (drift, logDb) = await Bootstrap.initDomain(shouldBufferLogs: false, listenStoreUpdates: false);
final ref = ProviderContainer(
overrides: [cancellationProvider.overrideWithValue(onCancel), driftProvider.overrideWith(driftOverride(drift))],
);
return result;
try {
return await computation(ref);
} catch (error, stack) {
log.severe("Error in runInIsolateGentle${debugLabel == null ? '' : ' for $debugLabel'}", error, stack);
return null;
} finally {
ref.dispose();
await Store.dispose();
await LogService.I.dispose();
await logDb.close();
await drift.close();
}
});
}

View File

@ -0,0 +1,163 @@
// Forked from worker_manager's `WorkerImpl` (src/worker/worker_io.dart): a
// `CancelRequest` completes the computation's [Completer] (so it can await
// cancellation and unwind) instead of flipping a polled flag, and [shutdown]
// lets the isolate drain and exit on its own rather than force-killing it. Only
// the gentle-with-cancellation path immich uses is kept.
//
// ignore_for_file: implementation_imports
import 'dart:async';
import 'dart:isolate';
import 'package:worker_manager/src/scheduling/task.dart';
import 'package:worker_manager/src/worker/cancel_request.dart';
import 'package:worker_manager/src/worker/result.dart';
/// A worker computation that receives a [Completer] which completes on
/// cancellation: await its future to react promptly, or read `isCompleted`.
typedef GentleExecution<R> = FutureOr<R> Function(Completer<void> onCancel);
class _Shutdown {
const _Shutdown();
}
class IsolateWorker {
IsolateWorker();
Isolate? _isolate;
RawReceivePort? _receivePort;
SendPort? _sendPort;
Completer<void>? _sendPortReceived;
Completer? _result;
String? taskId;
bool get initialized => _sendPortReceived?.isCompleted ?? false;
bool get initializing {
final sendPortReceived = _sendPortReceived;
return sendPortReceived != null && !sendPortReceived.isCompleted;
}
Future<void> initialize() async {
final sendPortReceived = _sendPortReceived = Completer<void>();
final receivePort = _receivePort = RawReceivePort();
receivePort.handler = (Object message) {
if (message is SendPort) {
_sendPort = message;
sendPortReceived.complete();
} else if (message is ResultSuccess) {
_result?.complete(message.value);
_afterTask();
} else if (message is ResultError) {
_result?.completeError(message.error, message.stackTrace);
_afterTask();
}
};
_isolate = await Isolate.spawn(_isolateEntry, receivePort.sendPort, errorsAreFatal: false);
await sendPortReceived.future;
}
Future<R> work<R>(Task<R> task) async {
taskId = task.id;
final result = _result = Completer();
_sendPort!.send(task.execution);
return await (result.future as Future<R>);
}
/// Cancels the current task without retiring the worker.
void cancelGentle() => _sendPort?.send(CancelRequest());
/// Cancels any in-flight task and awaits the isolate exiting on its own no
/// force-kill, so `finally` blocks and native cleanup always run.
///
/// Detaches the slot up front so a concurrent [initialize] can revive it
/// without colliding (revival installs fresh ports while this drains the ones
/// it captured locally). A revived worker is always idle, so the still-live
/// receive-port handler can't misroute a result.
Future<void> shutdown() async {
final sendPortReceived = _sendPortReceived;
if (sendPortReceived != null && !sendPortReceived.isCompleted) {
await sendPortReceived.future;
}
final isolate = _isolate;
final receivePort = _receivePort;
final sendPort = _sendPort;
if (isolate == null || receivePort == null || sendPort == null) {
return;
}
_isolate = null;
_sendPort = null;
_sendPortReceived = null;
// Not _result: an in-flight task still delivers it before exiting; nulling
// here would drop that and hang work()'s caller.
final exited = Completer<void>();
final exitPort = RawReceivePort();
exitPort.handler = (_) {
if (!exited.isCompleted) {
exited.complete();
}
exitPort.close();
};
isolate.addOnExitListener(exitPort.sendPort);
sendPort.send(const _Shutdown());
await exited.future;
receivePort.close();
}
void _afterTask() {
taskId = null;
_result = null;
}
static void _isolateEntry(SendPort sendPort) {
final receivePort = RawReceivePort();
sendPort.send(receivePort.sendPort);
// One task at a time, so a single completer suffices; null between tasks.
Completer<void>? onCancel;
void cancel() {
if (onCancel?.isCompleted == false) {
onCancel!.complete();
}
}
var shuttingDown = false;
var running = false;
receivePort.handler = (message) async {
if (message is _Shutdown) {
shuttingDown = true;
cancel();
if (!running) {
Isolate.exit();
}
return;
}
if (message is CancelRequest) {
cancel();
return;
}
final execution = message as GentleExecution;
onCancel = Completer<void>();
running = true;
Result result;
try {
result = ResultSuccess(await execution(onCancel!));
} catch (error, stackTrace) {
result = ResultError(error, stackTrace);
} finally {
onCancel = null;
running = false;
}
if (shuttingDown) {
// An isolate that has used platform channels can't exit on its own (Flutter's BackgroundIsolateBinaryMessenger
// opens an undisposable port), so closing our ports isn't enough. Isolate.exit delivers the result as its final
// message and terminates. It's abrupt (skips pending finally/microtasks) but safe here: the computation and its
// `finally` are already done and there's no await before this, so nothing pending is skipped.
Isolate.exit(sendPort, result);
}
sendPort.send(result);
};
}
}

View File

@ -6,8 +6,8 @@ import 'dart:math';
import 'package:collection/collection.dart';
import 'package:flutter/foundation.dart';
import 'package:immich_mobile/utils/isolate_worker.dart';
import 'package:worker_manager/src/number_of_processors/processors_io.dart';
import 'package:worker_manager/src/worker/worker.dart';
import 'package:worker_manager/worker_manager.dart';
final workerManagerPatch = _Executor();
@ -16,6 +16,13 @@ final workerManagerPatch = _Executor();
const _minId = -9007199254740992;
const _maxId = 9007199254740992;
class _GentleTask<R> extends Task<R> implements Gentle {
@override
final GentleExecution<R> execution;
_GentleTask({required super.id, required super.completer, required super.workPriority, required this.execution});
}
class Mixinable<T> {
late final itSelf = this as T;
}
@ -51,13 +58,13 @@ mixin _ExecutorLogger on Mixinable<_Executor> {
class _Executor extends Mixinable<_Executor> with _ExecutorLogger {
final _queue = PriorityQueue<Task>();
final _pool = <Worker>[];
final _pool = <IsolateWorker>[];
var _nextTaskId = _minId;
var _dynamicSpawning = false;
var _isolatesCount = numberOfProcessors;
@visibleForTesting
UnmodifiableListView<Worker> get pool => UnmodifiableListView(_pool);
UnmodifiableListView<IsolateWorker> get pool => UnmodifiableListView(_pool);
@override
Future<void> init({int? isolatesCount, bool? dynamicSpawning}) async {
@ -80,117 +87,37 @@ class _Executor extends Mixinable<_Executor> with _ExecutorLogger {
@override
Future<void> dispose() async {
_queue.clear();
for (final worker in _pool) {
if (worker.initialized || worker.initializing) {
worker.kill();
}
}
final shutdown = _pool.map((worker) => worker.shutdown()).toList(growable: false);
_pool.clear();
await Future.wait(shutdown);
super.dispose();
}
Cancelable<R> execute<R>(Execute<R> execution, {WorkPriority priority = WorkPriority.immediately}) {
return _createCancelable<R>(execution: execution, priority: priority);
}
Cancelable<R> executeNow<R>(ExecuteGentle<R> execution) {
final task = TaskGentle<R>(
id: "",
workPriority: WorkPriority.immediately,
execution: execution,
completer: Completer<R>(),
);
Future<void> run() async {
try {
final result = await execution(() => task.canceled);
task.complete(result, null, null);
} catch (error, st) {
task.complete(null, error, st);
}
}
run();
return Cancelable(completer: task.completer, onCancel: () => _cancel(task));
}
Cancelable<R> executeWithPort<R, T>(
ExecuteWithPort<R> execution, {
WorkPriority priority = WorkPriority.immediately,
required void Function(T value) onMessage,
}) {
return _createCancelable<R>(
execution: execution,
priority: priority,
onMessage: (message) => onMessage(message as T),
);
}
Cancelable<R> executeGentle<R>(ExecuteGentle<R> execution, {WorkPriority priority = WorkPriority.immediately}) {
return _createCancelable<R>(execution: execution, priority: priority);
}
Cancelable<R> executeGentleWithPort<R, T>(
ExecuteGentleWithPort<R> execution, {
WorkPriority priority = WorkPriority.immediately,
required void Function(T value) onMessage,
}) {
return _createCancelable<R>(
execution: execution,
priority: priority,
onMessage: (message) => onMessage(message as T),
);
}
void _createWorkers() {
for (var i = 0; i < _isolatesCount; i++) {
_pool.add(Worker());
}
}
Future<void> _initializeWorkers() async {
await Future.wait(_pool.map((e) => e.initialize()));
}
Cancelable<R> _createCancelable<R>({
required Function execution,
WorkPriority priority = WorkPriority.immediately,
void Function(Object value)? onMessage,
}) {
/// Runs [execution] on a worker isolate; its [Completer] completes when the
/// returned [Cancelable] is cancelled.
Cancelable<R> executeGentle<R>(GentleExecution<R> execution, {WorkPriority priority = WorkPriority.immediately}) {
if (_nextTaskId + 1 == _maxId) {
_nextTaskId = _minId;
}
final id = _nextTaskId.toString();
_nextTaskId++;
late final Task<R> task;
final completer = Completer<R>();
if (execution is ExecuteWithPort<R>) {
task = TaskWithPort<R>(
id: id,
workPriority: priority,
execution: execution,
completer: completer,
onMessage: onMessage!,
);
} else if (execution is ExecuteGentle<R>) {
task = TaskGentle<R>(id: id, workPriority: priority, execution: execution, completer: completer);
} else if (execution is ExecuteGentleWithPort<R>) {
task = TaskGentleWithPort<R>(
id: id,
workPriority: priority,
execution: execution,
completer: completer,
onMessage: onMessage!,
);
} else if (execution is Execute<R>) {
task = TaskRegular<R>(id: id, workPriority: priority, execution: execution, completer: completer);
}
final task = _GentleTask<R>(id: id, workPriority: priority, execution: execution, completer: Completer<R>());
_queue.add(task);
_schedule();
logTaskAdded(task.id);
return Cancelable(completer: task.completer, onCancel: () => _cancel(task));
}
void _createWorkers() {
for (var i = 0; i < _isolatesCount; i++) {
_pool.add(IsolateWorker());
}
}
Future<void> _initializeWorkers() async {
await Future.wait(_pool.map((e) => e.initialize()));
}
Future<void> _ensureWorkersInitialized() async {
if (_pool.isEmpty) {
_createWorkers();
@ -240,7 +167,9 @@ class _Executor extends Mixinable<_Executor> with _ExecutorLogger {
)
.whenComplete(() {
if (_dynamicSpawning && _queue.isEmpty) {
availableWorker.kill();
// Retire the idle worker; shutdown() nulls its fields so the husk
// stays pooled and is revived by initialize() if work arrives.
unawaited(availableWorker.shutdown());
}
_schedule();
});
@ -250,15 +179,8 @@ class _Executor extends Mixinable<_Executor> with _ExecutorLogger {
void _cancel(Task task) {
task.cancel();
_queue.remove(task);
final targetWorker = _pool.firstWhereOrNull((worker) => worker.taskId == task.id);
if (task is Gentle) {
targetWorker?.cancelGentle();
} else {
targetWorker?.kill();
if (!_dynamicSpawning) {
targetWorker?.initialize();
}
}
// All tasks are gentle: signal cancellation; the worker unwinds on its own.
_pool.firstWhereOrNull((worker) => worker.taskId == task.id)?.cancelGentle();
super._cancel(task);
}
}

View File

@ -105,25 +105,26 @@ class CloudIdResult {
@HostApi()
abstract class NativeSyncApi {
@async
bool shouldFullSync();
@TaskQueue(type: TaskQueueType.serialBackgroundThread)
@async
SyncDelta getMediaChanges();
void checkpointSync();
void clearSyncCheckpoint();
@TaskQueue(type: TaskQueueType.serialBackgroundThread)
@async
List<String> getAssetIdsForAlbum(String albumId);
@TaskQueue(type: TaskQueueType.serialBackgroundThread)
@async
List<PlatformAlbum> getAlbums();
@TaskQueue(type: TaskQueueType.serialBackgroundThread)
int getAssetsCountSince(String albumId, int timestamp);
@TaskQueue(type: TaskQueueType.serialBackgroundThread)
@async
List<PlatformAsset> getAssetsForAlbum(String albumId, {int? updatedTimeCond});
@async
@ -132,6 +133,8 @@ abstract class NativeSyncApi {
void cancelHashing();
void cancelSync();
@TaskQueue(type: TaskQueueType.serialBackgroundThread)
Map<String, List<PlatformAsset>> getTrashedAssets();

View File

@ -36,13 +36,6 @@ class _AbortCallbackWrapper {
class _MockAbortCallbackWrapper extends Mock implements _AbortCallbackWrapper {}
class _CancellationWrapper {
const _CancellationWrapper();
bool call() => false;
}
class _MockCancellationWrapper extends Mock implements _CancellationWrapper {}
void main() {
late SyncStreamService sut;
@ -94,9 +87,13 @@ void main() {
when(() => mockAbortCallbackWrapper()).thenReturn(false);
when(() => mockSyncApiRepo.streamChanges(any(), serverVersion: any(named: 'serverVersion'))).thenAnswer((
invocation,
) async {
when(
() => mockSyncApiRepo.streamChanges(
any(),
serverVersion: any(named: 'serverVersion'),
abortSignal: any(named: 'abortSignal'),
),
).thenAnswer((invocation) async {
handleEventsCallback = invocation.positionalArguments.first;
});
@ -105,6 +102,7 @@ void main() {
any(),
onReset: any(named: 'onReset'),
serverVersion: any(named: 'serverVersion'),
abortSignal: any(named: 'abortSignal'),
),
).thenAnswer((invocation) async {
handleEventsCallback = invocation.positionalArguments.first;
@ -233,8 +231,7 @@ void main() {
});
test("aborts and stops processing if cancelled during iteration", () async {
final cancellationChecker = _MockCancellationWrapper();
when(() => cancellationChecker()).thenReturn(false);
final cancellation = Completer<void>();
sut = SyncStreamService(
syncApiRepository: mockSyncApiRepo,
@ -243,7 +240,7 @@ void main() {
trashedLocalAssetRepository: mockTrashedLocalAssetRepo,
assetMediaRepository: mockAssetMediaRepo,
permissionRepository: mockPermissionRepo,
cancelChecker: cancellationChecker.call,
cancellation: cancellation,
api: mockApi,
syncMigrationRepository: mockSyncMigrationRepo,
);
@ -252,7 +249,7 @@ void main() {
final events = [SyncStreamStub.userDeleteV1, SyncStreamStub.userV1Admin, SyncStreamStub.partnerDeleteV1];
when(() => mockSyncStreamRepo.deleteUsersV1(any())).thenAnswer((_) async {
when(() => cancellationChecker()).thenReturn(true);
cancellation.complete();
});
await handleEventsCallback(events, mockAbortCallbackWrapper.call, mockResetCallbackWrapper.call);
@ -267,8 +264,7 @@ void main() {
});
test("aborts and stops processing if cancelled before processing batch", () async {
final cancellationChecker = _MockCancellationWrapper();
when(() => cancellationChecker()).thenReturn(false);
final cancellation = Completer<void>();
final processingCompleter = Completer<void>();
bool handler1Started = false;
@ -284,7 +280,7 @@ void main() {
trashedLocalAssetRepository: mockTrashedLocalAssetRepo,
assetMediaRepository: mockAssetMediaRepo,
permissionRepository: mockPermissionRepo,
cancelChecker: cancellationChecker.call,
cancellation: cancellation,
api: mockApi,
syncMigrationRepository: mockSyncMigrationRepo,
);
@ -303,7 +299,7 @@ void main() {
expect(handler1Started, isTrue);
// Signal cancellation while handler 1 is waiting
when(() => cancellationChecker()).thenReturn(true);
cancellation.complete();
await pumpEventQueue();
processingCompleter.complete();

View File

@ -0,0 +1,23 @@
import 'package:flutter_test/flutter_test.dart';
import 'package:immich_mobile/wm_executor.dart';
void main() {
tearDown(workerManagerPatch.dispose);
test('dispose() drains a cancelled task and delivers its result', () async {
await workerManagerPatch.init(isolatesCount: 1, dynamicSpawning: false);
final task = workerManagerPatch.executeGentle((onCancel) async {
await onCancel.future;
return 'drained';
});
await workerManagerPatch.dispose();
expect(
await task.timeout(const Duration(seconds: 5)),
'drained',
reason: 'the worker must finish and return its result, not be killed mid-task',
);
});
}