From 92bc22620b1bf77171e509b9e2887f016d124f4a Mon Sep 17 00:00:00 2001 From: mertalev <101130780+mertalev@users.noreply.github.com> Date: Sat, 22 Nov 2025 11:09:34 -0500 Subject: [PATCH] background upload plugin add schemas sync variants formatting initial implementation use existing db, wip move to separate folder fix table definitions wip wiring it up repository pattern --- .../alextran/immich/upload/UploadTask.g.kt | 45 ++- mobile/ios/Runner.xcodeproj/project.pbxproj | 7 + mobile/ios/Runner/AppDelegate.swift | 35 ++- .../Background/BackgroundWorkerApiImpl.swift | 8 +- .../StoreRepository.swift} | 157 ++++++---- .../Runner/Repositories/TaskRepository.swift | 279 ++++++++++++++++++ mobile/ios/Runner/Schemas/Constants.swift | 21 +- mobile/ios/Runner/Schemas/Tables.swift | 39 +-- mobile/ios/Runner/Upload/AssetData.swift | 62 ++++ mobile/ios/Runner/Upload/Delegate.swift | 136 +++++---- mobile/ios/Runner/Upload/DownloadQueue.swift | 216 ++++++-------- mobile/ios/Runner/Upload/Listeners.swift | 68 +++-- mobile/ios/Runner/Upload/NetworkMonitor.swift | 4 +- mobile/ios/Runner/Upload/UploadQueue.swift | 170 +++++------ mobile/ios/Runner/Upload/UploadTask.g.swift | 43 ++- mobile/ios/Runner/Upload/UploadTask.swift | 187 ++++-------- .../repositories/store.repository.dart | 2 + mobile/lib/platform/upload_api.g.dart | 30 +- mobile/pigeon/upload_api.dart | 12 +- 19 files changed, 939 insertions(+), 582 deletions(-) rename mobile/ios/Runner/{Schemas/Store.swift => Repositories/StoreRepository.swift} (51%) create mode 100644 mobile/ios/Runner/Repositories/TaskRepository.swift create mode 100644 mobile/ios/Runner/Upload/AssetData.swift diff --git a/mobile/android/app/src/main/kotlin/app/alextran/immich/upload/UploadTask.g.kt b/mobile/android/app/src/main/kotlin/app/alextran/immich/upload/UploadTask.g.kt index 9be979318b..2e82eda19b 100644 --- a/mobile/android/app/src/main/kotlin/app/alextran/immich/upload/UploadTask.g.kt +++ b/mobile/android/app/src/main/kotlin/app/alextran/immich/upload/UploadTask.g.kt @@ -90,18 +90,19 @@ enum class UploadApiErrorCode(val raw: Int) { NETWORK_ERROR(8), PHOTOS_INTERNAL_ERROR(9), PHOTOS_UNKNOWN_ERROR(10), - NO_SERVER_URL(11), - NO_DEVICE_ID(12), - NO_ACCESS_TOKEN(13), - INTERRUPTED(14), - CANCELLED(15), - DOWNLOAD_STALLED(16), - FORCE_QUIT(17), - OUT_OF_RESOURCES(18), - BACKGROUND_UPDATES_DISABLED(19), - UPLOAD_TIMEOUT(20), - I_CLOUD_RATE_LIMIT(21), - I_CLOUD_THROTTLED(22); + INTERRUPTED(11), + CANCELLED(12), + DOWNLOAD_STALLED(13), + FORCE_QUIT(14), + OUT_OF_RESOURCES(15), + BACKGROUND_UPDATES_DISABLED(16), + UPLOAD_TIMEOUT(17), + I_CLOUD_RATE_LIMIT(18), + I_CLOUD_THROTTLED(19), + INVALID_RESPONSE(20), + BAD_REQUEST(21), + INTERNAL_SERVER_ERROR(22), + UNAUTHORIZED(23); companion object { fun ofRaw(raw: Int): UploadApiErrorCode? { @@ -262,6 +263,7 @@ interface UploadApi { fun cancelAll(callback: (Result) -> Unit) fun enqueueAssets(localIds: List, callback: (Result) -> Unit) fun enqueueFiles(paths: List, callback: (Result) -> Unit) + fun onConfigChange(key: Long, callback: (Result) -> Unit) companion object { /** The codec used by UploadApi. */ @@ -361,6 +363,25 @@ interface UploadApi { channel.setMessageHandler(null) } } + run { + val channel = BasicMessageChannel(binaryMessenger, "dev.flutter.pigeon.immich_mobile.UploadApi.onConfigChange$separatedMessageChannelSuffix", codec) + if (api != null) { + channel.setMessageHandler { message, reply -> + val args = message as List + val keyArg = args[0] as Long + api.onConfigChange(keyArg) { result: Result -> + val error = result.exceptionOrNull() + if (error != null) { + reply.reply(UploadTaskPigeonUtils.wrapError(error)) + } else { + reply.reply(UploadTaskPigeonUtils.wrapResult(null)) + } + } + } + } else { + channel.setMessageHandler(null) + } + } } } } diff --git a/mobile/ios/Runner.xcodeproj/project.pbxproj b/mobile/ios/Runner.xcodeproj/project.pbxproj index 876732f8fc..4d814ef081 100644 --- a/mobile/ios/Runner.xcodeproj/project.pbxproj +++ b/mobile/ios/Runner.xcodeproj/project.pbxproj @@ -167,6 +167,11 @@ path = Upload; sourceTree = ""; }; + FEA74CE22ED223690014C832 /* Repositories */ = { + isa = PBXFileSystemSynchronizedRootGroup; + path = Repositories; + sourceTree = ""; + }; FEB3BA112EBD52860081A5EB /* Schemas */ = { isa = PBXFileSystemSynchronizedRootGroup; path = Schemas; @@ -281,6 +286,7 @@ 97C146F01CF9000F007C117D /* Runner */ = { isa = PBXGroup; children = ( + FEA74CE22ED223690014C832 /* Repositories */, FE14355D2EC446E90009D5AC /* Upload */, FEE084F22EC172080045228E /* Schemas */, FEB3BA112EBD52860081A5EB /* Schemas */, @@ -373,6 +379,7 @@ B231F52D2E93A44A00BC45D1 /* Core */, B2CF7F8C2DDE4EBB00744BF6 /* Sync */, FE14355D2EC446E90009D5AC /* Upload */, + FEA74CE22ED223690014C832 /* Repositories */, FEB3BA112EBD52860081A5EB /* Schemas */, FEE084F22EC172080045228E /* Schemas */, ); diff --git a/mobile/ios/Runner/AppDelegate.swift b/mobile/ios/Runner/AppDelegate.swift index fea19ac1c7..5cbab8a9f9 100644 --- a/mobile/ios/Runner/AppDelegate.swift +++ b/mobile/ios/Runner/AppDelegate.swift @@ -1,5 +1,6 @@ import BackgroundTasks import Flutter +import SQLiteData import UIKit import network_info_plus import path_provider_foundation @@ -9,6 +10,8 @@ import shared_preferences_foundation @main @objc class AppDelegate: FlutterAppDelegate { + private var backgroundCompletionHandlers: [String: () -> Void] = [:] + override func application( _ application: UIApplication, didFinishLaunchingWithOptions launchOptions: [UIApplication.LaunchOptionsKey: Any]? @@ -53,18 +56,46 @@ import shared_preferences_foundation return super.application(application, didFinishLaunchingWithOptions: launchOptions) } + override func application( + _ application: UIApplication, + handleEventsForBackgroundURLSession identifier: String, + completionHandler: @escaping () -> Void + ) { + backgroundCompletionHandlers[identifier] = completionHandler + } + + func completionHandler(forSession identifier: String) -> (() -> Void)? { + return backgroundCompletionHandlers.removeValue(forKey: identifier) + } + public static func registerPlugins(with engine: FlutterEngine) { NativeSyncApiImpl.register(with: engine.registrar(forPlugin: NativeSyncApiImpl.name)!) ThumbnailApiSetup.setUp(binaryMessenger: engine.binaryMessenger, api: ThumbnailApiImpl()) BackgroundWorkerFgHostApiSetup.setUp(binaryMessenger: engine.binaryMessenger, api: BackgroundWorkerApiImpl()) - + let statusListener = StatusEventListener() StreamStatusStreamHandler.register(with: engine.binaryMessenger, streamHandler: statusListener) let progressListener = ProgressEventListener() StreamProgressStreamHandler.register(with: engine.binaryMessenger, streamHandler: progressListener) + + let dbUrl = try! FileManager.default.url( + for: .documentDirectory, + in: .userDomainMask, + appropriateFor: nil, + create: true + ).appendingPathComponent("immich.sqlite") + let db = try! DatabasePool(path: dbUrl.path) + let storeRepository = StoreRepository(db: db) + let taskRepository = TaskRepository(db: db) + UploadApiSetup.setUp( binaryMessenger: engine.binaryMessenger, - api: UploadApiImpl(statusListener: statusListener, progressListener: progressListener) + api: UploadApiImpl( + storeRepository: storeRepository, + taskRepository: taskRepository, + statusListener: statusListener, + progressListener: progressListener + ) ) } diff --git a/mobile/ios/Runner/Background/BackgroundWorkerApiImpl.swift b/mobile/ios/Runner/Background/BackgroundWorkerApiImpl.swift index a7bbc31ceb..bee642547f 100644 --- a/mobile/ios/Runner/Background/BackgroundWorkerApiImpl.swift +++ b/mobile/ios/Runner/Background/BackgroundWorkerApiImpl.swift @@ -29,15 +29,15 @@ class BackgroundWorkerApiImpl: BackgroundWorkerFgHostApi { public static func registerBackgroundWorkers() { BGTaskScheduler.shared.register( forTaskWithIdentifier: processingTaskID, using: nil) { task in - if task is BGProcessingTask { - handleBackgroundProcessing(task: task as! BGProcessingTask) + if case let task as BGProcessingTask = task { + handleBackgroundProcessing(task: task) } } BGTaskScheduler.shared.register( forTaskWithIdentifier: refreshTaskID, using: nil) { task in - if task is BGAppRefreshTask { - handleBackgroundRefresh(task: task as! BGAppRefreshTask) + if case let task as BGAppRefreshTask = task { + handleBackgroundRefresh(task: task) } } } diff --git a/mobile/ios/Runner/Schemas/Store.swift b/mobile/ios/Runner/Repositories/StoreRepository.swift similarity index 51% rename from mobile/ios/Runner/Schemas/Store.swift rename to mobile/ios/Runner/Repositories/StoreRepository.swift index b80f99a621..7a98666aa2 100644 --- a/mobile/ios/Runner/Schemas/Store.swift +++ b/mobile/ios/Runner/Repositories/StoreRepository.swift @@ -1,5 +1,86 @@ import SQLiteData +protocol StoreProtocol { + func get>(_ key: StoreKey.Typed) throws -> T? + func get>(_ key: StoreKey.Typed) throws -> T? + func set>(_ key: StoreKey.Typed, value: T) throws + func set>(_ key: StoreKey.Typed, value: T) throws + func invalidateCache() +} + +protocol StoreConvertible { + associatedtype StorageType + static var cacheKeyPath: ReferenceWritableKeyPath { get } + static func fromValue(_ value: StorageType) throws(StoreError) -> Self + static func toValue(_ value: Self) throws(StoreError) -> StorageType +} + +final class StoreRepository: StoreProtocol { + private let db: DatabasePool + private static let cache = StoreCache() + private static var lock = os_unfair_lock() + + init(db: DatabasePool) { + self.db = db + } + + func get>(_ key: StoreKey.Typed) throws -> T? { + os_unfair_lock_lock(&Self.lock) + defer { os_unfair_lock_unlock(&Self.lock) } + let cached = Self.cache.get(key) + if _fastPath(cached != nil) { return cached! } + return try db.read { conn in + let query = Store.select(\.intValue).where { $0.id.eq(key.rawValue) } + if let value = try query.fetchOne(conn) ?? nil { + let converted = try T.fromValue(value) + Self.cache.set(key, value: converted) + return converted + } + return nil + } + } + + func get>(_ key: StoreKey.Typed) throws -> T? { + os_unfair_lock_lock(&Self.lock) + defer { os_unfair_lock_unlock(&Self.lock) } + let cached = Self.cache.get(key) + if _fastPath(cached != nil) { return cached! } + return try db.read { conn in + let query = Store.select(\.stringValue).where { $0.id.eq(key.rawValue) } + if let value = try query.fetchOne(conn) ?? nil { + let converted = try T.fromValue(value) + Self.cache.set(key, value: converted) + return converted + } + return nil + } + } + + func set>(_ key: StoreKey.Typed, value: T) throws { + os_unfair_lock_lock(&Self.lock) + defer { os_unfair_lock_unlock(&Self.lock) } + let converted = try T.toValue(value) + try db.write { conn in + try Store.upsert { Store(id: key.rawValue, stringValue: nil, intValue: converted) }.execute(conn) + } + Self.cache.set(key, value: value) + } + + func set>(_ key: StoreKey.Typed, value: T) throws { + os_unfair_lock_lock(&Self.lock) + defer { os_unfair_lock_unlock(&Self.lock) } + let converted = try T.toValue(value) + try db.write { conn in + try Store.upsert { Store(id: key.rawValue, stringValue: converted, intValue: nil) }.execute(conn) + } + Self.cache.set(key, value: value) + } + + func invalidateCache() { + Self.cache.reset() + } +} + enum StoreError: Error { case invalidJSON(String) case invalidURL(String) @@ -7,29 +88,21 @@ enum StoreError: Error { case notFound } -protocol StoreConvertible { - static var cacheKeyPath: ReferenceWritableKeyPath { get } - associatedtype StorageType - static func fromValue(_ value: StorageType) throws(StoreError) -> Self - static func toValue(_ value: Self) throws(StoreError) -> StorageType -} - extension StoreConvertible { - static func get(_ cache: StoreCache, key: StoreKey) -> Self? { - os_unfair_lock_lock(&cache.lock) - defer { os_unfair_lock_unlock(&cache.lock) } + fileprivate static func get(_ cache: StoreCache, key: StoreKey) -> Self? { return cache[keyPath: cacheKeyPath][key] } - static func set(_ cache: StoreCache, key: StoreKey, value: Self?) { - os_unfair_lock_lock(&cache.lock) - defer { os_unfair_lock_unlock(&cache.lock) } + fileprivate static func set(_ cache: StoreCache, key: StoreKey, value: Self?) { cache[keyPath: cacheKeyPath][key] = value } + + fileprivate static func reset(_ cache: StoreCache) { + cache.reset() + } } final class StoreCache { - fileprivate var lock = os_unfair_lock() fileprivate var intCache: [StoreKey: Int] = [:] fileprivate var boolCache: [StoreKey: Bool] = [:] fileprivate var dateCache: [StoreKey: Date] = [:] @@ -39,11 +112,21 @@ final class StoreCache { fileprivate var stringDictCache: [StoreKey: [String: String]] = [:] func get(_ key: StoreKey.Typed) -> T? { - T.get(self, key: key.rawValue) + return T.get(self, key: key.rawValue) } func set(_ key: StoreKey.Typed, value: T?) { - T.set(self, key: key.rawValue, value: value) + return T.set(self, key: key.rawValue, value: value) + } + + func reset() { + intCache.removeAll(keepingCapacity: true) + boolCache.removeAll(keepingCapacity: true) + dateCache.removeAll(keepingCapacity: true) + stringCache.removeAll(keepingCapacity: true) + urlCache.removeAll(keepingCapacity: true) + endpointArrayCache.removeAll(keepingCapacity: true) + stringDictCache.removeAll(keepingCapacity: true) } } @@ -82,7 +165,7 @@ extension URL: StoreConvertible { static func toValue(_ value: URL) -> String { value.absoluteString } } -extension StoreConvertible where Self: Codable, StorageType == String { +extension StoreConvertible where Self: Codable { static var jsonDecoder: JSONDecoder { JSONDecoder() } static var jsonEncoder: JSONEncoder { JSONEncoder() } @@ -118,43 +201,3 @@ extension Dictionary: StoreConvertible where Key == String, Value == String { static let cacheKeyPath = \StoreCache.stringDictCache typealias StorageType = String } - -extension Store { - static let cache = StoreCache() - - static func get(_ conn: Database, _ key: StoreKey.Typed) throws -> T? - where T.StorageType == Int { - if let cached = cache.get(key) { return cached } - let query = Store.select(\.intValue).where { $0.id.eq(key.rawValue) } - if let value = try query.fetchOne(conn) ?? nil { - let converted = try T.fromValue(value) - cache.set(key, value: converted) - } - return nil - } - - static func get(_ conn: Database, _ key: StoreKey.Typed) throws -> T? - where T.StorageType == String { - if let cached = cache.get(key) { return cached } - let query = Store.select(\.stringValue).where { $0.id.eq(key.rawValue) } - if let value = try query.fetchOne(conn) ?? nil { - let converted = try T.fromValue(value) - cache.set(key, value: converted) - } - return nil - } - - static func set(_ conn: Database, _ key: StoreKey.Typed, value: T) throws - where T.StorageType == Int { - let converted = try T.toValue(value) - try Store.upsert { Store(id: key.rawValue, stringValue: nil, intValue: converted) }.execute(conn) - cache.set(key, value: value) - } - - static func set(_ conn: Database, _ key: StoreKey.Typed, value: T) throws - where T.StorageType == String { - let converted = try T.toValue(value) - try Store.upsert { Store(id: key.rawValue, stringValue: converted, intValue: nil) }.execute(conn) - cache.set(key, value: value) - } -} diff --git a/mobile/ios/Runner/Repositories/TaskRepository.swift b/mobile/ios/Runner/Repositories/TaskRepository.swift new file mode 100644 index 0000000000..2d70208290 --- /dev/null +++ b/mobile/ios/Runner/Repositories/TaskRepository.swift @@ -0,0 +1,279 @@ +import SQLiteData + +protocol TaskProtocol { + func getTaskIds(status: TaskStatus) async throws -> [Int64] + func getBackupCandidates() async throws -> [LocalAssetCandidate] + func getBackupCandidates(ids: [String]) async throws -> [LocalAssetCandidate] + func getDownloadTasks() async throws -> [LocalAssetDownloadData] + func getUploadTasks() async throws -> [LocalAssetUploadData] + func markOrphansPending(ids: [Int64]) async throws + func markDownloadQueued(taskId: Int64, isLivePhoto: Bool, filePath: URL) async throws + func markUploadQueued(taskId: Int64) async throws + func markDownloadComplete(taskId: Int64, localId: String, hash: String?) async throws -> TaskStatus + func markUploadSuccess(taskId: Int64, livePhotoVideoId: String?) async throws + func retryOrFail(taskId: Int64, code: UploadErrorCode, status: TaskStatus) async throws + func enqueue(assets: [LocalAssetCandidate], imagePriority: Float, videoPriority: Float) async throws + func enqueue(files: [String]) async throws + func resolveError(code: UploadErrorCode) async throws + func getFilename(taskId: Int64) async throws -> String? +} + +final class TaskRepository: TaskProtocol { + private let db: DatabasePool + + init(db: DatabasePool) { + self.db = db + } + + func getTaskIds(status: TaskStatus) async throws -> [Int64] { + return try await db.read { conn in + try UploadTask.select(\.id).where { $0.status.eq(status) }.fetchAll(conn) + } + } + + func getBackupCandidates() async throws -> [LocalAssetCandidate] { + return try await db.read { conn in + return try LocalAsset.backupCandidates.fetchAll(conn) + } + } + + func getBackupCandidates(ids: [String]) async throws -> [LocalAssetCandidate] { + return try await db.read { conn in + return try LocalAsset.backupCandidates.where { $0.id.in(ids) }.fetchAll(conn) + } + } + + func getDownloadTasks() async throws -> [LocalAssetDownloadData] { + return try await db.read({ conn in + return try UploadTask.join(LocalAsset.all) { task, asset in task.localId.eq(asset.id) } + .where { task, _ in task.canRetry && task.noFatalError && LocalAsset.withChecksum.exists() } + .select { task, asset in + LocalAssetDownloadData.Columns( + checksum: asset.checksum, + createdAt: asset.createdAt, + filename: asset.name, + livePhotoVideoId: task.livePhotoVideoId, + localId: asset.id, + taskId: task.id, + updatedAt: asset.updatedAt + ) + } + .order { task, asset in (task.priority.desc(), task.createdAt) } + .limit { _, _ in UploadTaskStat.availableDownloadSlots } + .fetchAll(conn) + }) + } + + func getUploadTasks() async throws -> [LocalAssetUploadData] { + return try await db.read({ conn in + return try UploadTask.join(LocalAsset.all) { task, asset in task.localId.eq(asset.id) } + .where { task, _ in task.canRetry && task.noFatalError && LocalAsset.withChecksum.exists() } + .select { task, asset in + LocalAssetUploadData.Columns( + filename: asset.name, + filePath: task.filePath.unwrapped, + priority: task.priority, + taskId: task.id, + type: asset.type + ) + } + .order { task, asset in (task.priority.desc(), task.createdAt) } + .limit { task, _ in UploadTaskStat.availableUploadSlots } + .fetchAll(conn) + }) + } + + func markOrphansPending(ids: [Int64]) async throws { + try await db.write { conn in + try UploadTask.update { + $0.filePath = nil + $0.status = .downloadPending + } + .where { row in row.status.in([TaskStatus.downloadQueued, TaskStatus.uploadPending]) || row.id.in(ids) } + .execute(conn) + } + } + + func markDownloadQueued(taskId: Int64, isLivePhoto: Bool, filePath: URL) async throws { + try await db.write { conn in + try UploadTask.update { + $0.status = .downloadQueued + $0.isLivePhoto = isLivePhoto + $0.filePath = filePath + } + .where { $0.id.eq(taskId) }.execute(conn) + } + } + + func markUploadQueued(taskId: Int64) async throws { + try await db.write { conn in + try UploadTask.update { row in + row.status = .uploadQueued + row.filePath = nil + } + .where { $0.id.eq(taskId) }.execute(conn) + } + } + + func markDownloadComplete(taskId: Int64, localId: String, hash: String?) async throws -> TaskStatus { + return try await db.write { conn in + if let hash { + try LocalAsset.update { $0.checksum = hash }.where { $0.id.eq(localId) }.execute(conn) + } + let status = + if let hash, try RemoteAsset.select(\.rowid).where({ $0.checksum.eq(hash) }).fetchOne(conn) != nil { + TaskStatus.uploadSkipped + } else { + TaskStatus.uploadPending + } + try UploadTask.update { $0.status = status }.where { $0.id.eq(taskId) }.execute(conn) + return status + } + } + + func markUploadSuccess(taskId: Int64, livePhotoVideoId: String?) async throws { + try await db.write { conn in + let task = + try UploadTask + .update { $0.status = .uploadComplete } + .where { $0.id.eq(taskId) } + .returning(\.self) + .fetchOne(conn) + guard let task, let localId = task.localId, let isLivePhoto = task.isLivePhoto, isLivePhoto, + task.livePhotoVideoId == nil + else { return } + try UploadTask.insert { + UploadTask.Draft( + attempts: 0, + createdAt: Date(), + filePath: nil, + isLivePhoto: true, + lastError: nil, + livePhotoVideoId: livePhotoVideoId, + localId: localId, + method: .multipart, + priority: 0.7, + retryAfter: nil, + status: .downloadPending, + ) + }.execute(conn) + } + } + + func retryOrFail(taskId: Int64, code: UploadErrorCode, status: TaskStatus) async throws { + try await db.write { conn in + try UploadTask.update { row in + let retryOffset = + switch code { + case .iCloudThrottled, .iCloudRateLimit, .notEnoughSpace: 3000 + default: 0 + } + row.status = Case() + .when(row.localId.is(nil) && row.attempts.lte(TaskConfig.maxRetries), then: TaskStatus.uploadPending) + .when(row.attempts.lte(TaskConfig.maxRetries), then: TaskStatus.downloadPending) + .else(status) + row.attempts += 1 + row.lastError = code + row.retryAfter = #sql("unixepoch('now') + (\(4 << row.attempts)) + \(retryOffset)") + } + .where { $0.id.eq(taskId) }.execute(conn) + } + } + + func enqueue(assets: [LocalAssetCandidate], imagePriority: Float, videoPriority: Float) async throws { + try await db.write { conn in + var draft = draftStub + for candidate in assets { + draft.localId = candidate.id + draft.priority = candidate.type == .image ? imagePriority : videoPriority + try UploadTask.insert { + draft + } onConflict: { + ($0.localId, $0.livePhotoVideoId) + } + .execute(conn) + } + } + } + + func enqueue(files: [String]) async throws { + try await db.write { conn in + var draft = draftStub + draft.priority = 1.0 + draft.status = .uploadPending + for file in files { + draft.filePath = URL(fileURLWithPath: file, isDirectory: false) + try UploadTask.insert { draft }.execute(conn) + } + } + } + + func resolveError(code: UploadErrorCode) async throws { + try await db.write { conn in + try UploadTask.update { $0.lastError = nil }.where { $0.lastError.unwrapped.eq(code) }.execute(conn) + } + } + + func getFilename(taskId: Int64) async throws -> String? { + try await db.read { conn in + try UploadTask.join(LocalAsset.all) { task, asset in task.localId.eq(asset.id) }.select(\.1.name).fetchOne(conn) + } + } + + private var draftStub: UploadTask.Draft { + .init( + attempts: 0, + createdAt: Date(), + filePath: nil, + isLivePhoto: nil, + lastError: nil, + livePhotoVideoId: nil, + localId: nil, + method: .multipart, + priority: 0.5, + retryAfter: nil, + status: .downloadPending, + ) + } +} + +extension UploadTask.TableColumns { + var noFatalError: some QueryExpression { lastError.is(nil) || !lastError.unwrapped.in(UploadErrorCode.fatal) } + var canRetry: some QueryExpression { + attempts.lte(TaskConfig.maxRetries) && (retryAfter.is(nil) || retryAfter.unwrapped <= Date().unixTime) + } +} + +extension LocalAlbum { + static let selected = Self.where { $0.backupSelection.eq(BackupSelection.selected) } + static let excluded = Self.where { $0.backupSelection.eq(BackupSelection.excluded) } +} + +extension LocalAlbumAsset { + static let selected = Self.where { + $0.id.assetId.eq(LocalAsset.columns.id) && $0.id.albumId.in(LocalAlbum.selected.select(\.id)) + } + static let excluded = Self.where { + $0.id.assetId.eq(LocalAsset.columns.id) && $0.id.albumId.in(LocalAlbum.excluded.select(\.id)) + } +} + +extension RemoteAsset { + static let currentUser = Self.where { _ in + ownerId.eq(Store.select(\.stringValue).where { $0.id.eq(StoreKey.currentUser.rawValue) }.unwrapped) + } +} + +extension LocalAsset { + static let withChecksum = Self.where { $0.checksum.isNot(nil) } + static let shouldBackup = Self.where { _ in LocalAlbumAsset.selected.exists() && !LocalAlbumAsset.excluded.exists() } + static let notBackedUp = Self.where { local in + !RemoteAsset.currentUser.where { remote in local.checksum.eq(remote.checksum) }.exists() + } + static let backupCandidates = Self + .shouldBackup + .notBackedUp + .where { local in !UploadTask.where { $0.localId.eq(local.id) }.exists() } + .select { LocalAssetCandidate.Columns(id: $0.id, type: $0.type) } + .limit { _ in UploadTaskStat.availableSlots } +} diff --git a/mobile/ios/Runner/Schemas/Constants.swift b/mobile/ios/Runner/Schemas/Constants.swift index 07c47dbe12..1b23e3647d 100644 --- a/mobile/ios/Runner/Schemas/Constants.swift +++ b/mobile/ios/Runner/Schemas/Constants.swift @@ -2,6 +2,8 @@ import SQLiteData extension Notification.Name { static let networkDidConnect = Notification.Name("networkDidConnect") + static let downloadTaskDidComplete = Notification.Name("downloadTaskDidComplete") + static let uploadTaskDidComplete = Notification.Name("uploadTaskDidComplete") } enum TaskConfig { @@ -12,6 +14,7 @@ enum TaskConfig { static let sessionId = "app.mertalev.immich.upload" static let downloadCheckIntervalNs: UInt64 = 30_000_000_000 // 30 seconds static let downloadTimeoutS = TimeInterval(60) + static let progressThrottleInterval = TimeInterval(0.1) static let transferSpeedAlpha = 0.4 static let originalsDir = FileManager.default.temporaryDirectory.appendingPathComponent( "originals", @@ -214,6 +217,18 @@ enum UploadError: Error { case fileCreationFailed case iCloudError(UploadErrorCode) case photosError(UploadErrorCode) + case unknown + + var code: UploadErrorCode { + switch self { + case .iCloudError(let code), .photosError(let code): + return code + case .unknown: + return .unknown + case .fileCreationFailed: + return .writeFailed + } + } } enum UploadErrorCode: Int, QueryBindable { @@ -228,9 +243,6 @@ enum UploadErrorCode: Int, QueryBindable { case networkError case photosInternalError case photosUnknownError - case noServerUrl - case noDeviceId - case noAccessToken case interrupted case cancelled case downloadStalled @@ -243,6 +255,9 @@ enum UploadErrorCode: Int, QueryBindable { case invalidResponse case badRequest case internalServerError + case unauthorized + + static let fatal: [UploadErrorCode] = [.assetNotFound, .resourceNotFound, .invalidResource, .badRequest, .unauthorized] } enum AssetType: Int, QueryBindable { diff --git a/mobile/ios/Runner/Schemas/Tables.swift b/mobile/ios/Runner/Schemas/Tables.swift index b9f18417f8..0fbea4fb59 100644 --- a/mobile/ios/Runner/Schemas/Tables.swift +++ b/mobile/ios/Runner/Schemas/Tables.swift @@ -73,11 +73,6 @@ struct LocalAlbum: Identifiable { let updatedAt: Date } -extension LocalAlbum { - static let selected = Self.where { $0.backupSelection.eq(BackupSelection.selected) } - static let excluded = Self.where { $0.backupSelection.eq(BackupSelection.excluded) } -} - @Table("local_album_asset_entity") struct LocalAlbumAsset { let id: ID @@ -93,15 +88,6 @@ struct LocalAlbumAsset { } } -extension LocalAlbumAsset { - static let selected = Self.where { - $0.id.assetId.eq(LocalAsset.columns.id) && $0.id.albumId.in(LocalAlbum.selected.select(\.id)) - } - static let excluded = Self.where { - $0.id.assetId.eq(LocalAsset.columns.id) && $0.id.albumId.in(LocalAlbum.excluded.select(\.id)) - } -} - @Table("local_asset_entity") struct LocalAsset: Identifiable { let id: String @@ -119,18 +105,6 @@ struct LocalAsset: Identifiable { @Column("updated_at") let updatedAt: String let width: Int? - - static func getCandidates() -> Where { - return Self.where { local in - LocalAlbumAsset.selected.exists() - && !LocalAlbumAsset.excluded.exists() - && !RemoteAsset.where { - local.checksum.eq($0.checksum) - && $0.ownerId.eq(Store.select(\.stringValue).where { $0.id.eq(StoreKey.currentUser.rawValue) }.unwrapped) - }.exists() - && !UploadTask.where { $0.localId.eq(local.id) }.exists() - } - } } @Selection @@ -143,6 +117,7 @@ struct LocalAssetCandidate { struct LocalAssetDownloadData { let checksum: String? let createdAt: String + let filename: String let livePhotoVideoId: RemoteAsset.ID? let localId: LocalAsset.ID let taskId: UploadTask.ID @@ -151,6 +126,7 @@ struct LocalAssetDownloadData { @Selection struct LocalAssetUploadData { + let filename: String let filePath: URL let priority: Float let taskId: UploadTask.ID @@ -375,16 +351,7 @@ struct UploadTask: Identifiable { var priority: Float @Column("retry_after", as: Date?.UnixTimeRepresentation.self) let retryAfter: Date? - let status: TaskStatus - - static func retryOrFail(code: UploadErrorCode, status: TaskStatus) -> Update { - return Self.update { row in - row.status = Case().when(row.attempts.lte(TaskConfig.maxRetries), then: TaskStatus.downloadPending).else(status) - row.attempts += 1 - row.lastError = code - row.retryAfter = #sql("unixepoch('now') + (\(4 << row.attempts))") - } - } + var status: TaskStatus } @Table("upload_task_stats") diff --git a/mobile/ios/Runner/Upload/AssetData.swift b/mobile/ios/Runner/Upload/AssetData.swift new file mode 100644 index 0000000000..c353e18957 --- /dev/null +++ b/mobile/ios/Runner/Upload/AssetData.swift @@ -0,0 +1,62 @@ +import StructuredFieldValues + +struct AssetData: StructuredFieldValue { + static let structuredFieldType: StructuredFieldType = .dictionary + + let deviceAssetId: String + let deviceId: String + let fileCreatedAt: String + let fileModifiedAt: String + let fileName: String + let isFavorite: Bool + let livePhotoVideoId: String? + + static let boundary = "Boundary-\(UUID().uuidString)" + static let deviceAssetIdField = "--\(boundary)\r\nContent-Disposition: form-data; name=\"deviceAssetId\"\r\n\r\n" + .data(using: .utf8)! + static let deviceIdField = "\r\n--\(boundary)\r\nContent-Disposition: form-data; name=\"deviceId\"\r\n\r\n" + .data(using: .utf8)! + static let fileCreatedAtField = + "\r\n--\(boundary)\r\nContent-Disposition: form-data; name=\"fileCreatedAt\"\r\n\r\n" + .data(using: .utf8)! + static let fileModifiedAtField = + "\r\n--\(boundary)\r\nContent-Disposition: form-data; name=\"fileModifiedAt\"\r\n\r\n" + .data(using: .utf8)! + static let isFavoriteField = "\r\n--\(boundary)\r\nContent-Disposition: form-data; name=\"isFavorite\"\r\n\r\n" + .data(using: .utf8)! + static let livePhotoVideoIdField = + "\r\n--\(boundary)\r\nContent-Disposition: form-data; name=\"livePhotoVideoId\"\r\n\r\n" + .data(using: .utf8)! + static let trueData = "true".data(using: .utf8)! + static let falseData = "false".data(using: .utf8)! + static let footer = "\r\n--\(boundary)--\r\n".data(using: .utf8)! + static let contentType = "multipart/form-data; boundary=\(boundary)" + + func multipart() -> (Data, Data) { + var header = Data() + header.append(Self.deviceAssetIdField) + header.append(deviceAssetId.data(using: .utf8)!) + + header.append(Self.deviceIdField) + header.append(deviceId.data(using: .utf8)!) + + header.append(Self.fileCreatedAtField) + header.append(fileCreatedAt.data(using: .utf8)!) + + header.append(Self.fileModifiedAtField) + header.append(fileModifiedAt.data(using: .utf8)!) + + header.append(Self.isFavoriteField) + header.append(isFavorite ? Self.trueData : Self.falseData) + + if let livePhotoVideoId { + header.append(Self.livePhotoVideoIdField) + header.append(livePhotoVideoId.data(using: .utf8)!) + } + header.append( + "\r\n--\(Self.boundary)\r\nContent-Disposition: form-data; name=\"assetData\"; filename=\"\(fileName)\"\r\nContent-Type: application/octet-stream\r\n\r\n" + .data(using: .utf8)! + ) + return (header, Self.footer) + } +} diff --git a/mobile/ios/Runner/Upload/Delegate.swift b/mobile/ios/Runner/Upload/Delegate.swift index b3c5a4bdff..418c6a8cc4 100644 --- a/mobile/ios/Runner/Upload/Delegate.swift +++ b/mobile/ios/Runner/Upload/Delegate.swift @@ -1,19 +1,33 @@ import SQLiteData -class UploadApiDelegate: NSObject, URLSessionDataDelegate, URLSessionTaskDelegate { - private static let stateLock = NSLock() - private static var transferStates: [Int64: NetworkTransferState] = [:] - private static var responseData: [Int64: Data] = [:] - private static let jsonDecoder = JSONDecoder() +private let stateLock = NSLock() +private var transferStates: [Int64: NetworkTransferState] = [:] +private var responseData: [Int64: Data] = [:] +private let jsonDecoder = JSONDecoder() - private let db: DatabasePool - private let statusListener: StatusEventListener - private let progressListener: ProgressEventListener - weak var downloadQueue: DownloadQueue? - weak var uploadQueue: UploadQueue? +private class NetworkTransferState { + var lastUpdateTime: Date + var totalBytesTransferred: Int64 + var currentSpeed: Double? - init(db: DatabasePool, statusListener: StatusEventListener, progressListener: ProgressEventListener) { - self.db = db + init(lastUpdateTime: Date, totalBytesTransferred: Int64, currentSpeed: Double?) { + self.lastUpdateTime = lastUpdateTime + self.totalBytesTransferred = totalBytesTransferred + self.currentSpeed = currentSpeed + } +} + +final class UploadApiDelegate< + TaskRepo: TaskProtocol, + StatusListener: TaskStatusListener, + ProgressListener: TaskProgressListener +>: NSObject, URLSessionDataDelegate, URLSessionTaskDelegate { + private let taskRepository: TaskRepo + private let statusListener: StatusListener + private let progressListener: ProgressListener + + init(taskRepository: TaskRepo, statusListener: StatusListener, progressListener: ProgressListener) { + self.taskRepository = taskRepository self.statusListener = statusListener self.progressListener = progressListener } @@ -30,11 +44,11 @@ class UploadApiDelegate: NSObject, URLSessionDataDelegate, URLSessionTaskDelegat let taskId = Int64(taskIdStr) else { return } - Self.stateLock.withLock { - if var response = Self.responseData[taskId] { + stateLock.withLock { + if var response = responseData[taskId] { response.append(data) } else { - Self.responseData[taskId] = data + responseData[taskId] = data } } } @@ -42,8 +56,7 @@ class UploadApiDelegate: NSObject, URLSessionDataDelegate, URLSessionTaskDelegat func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) { Task { defer { - downloadQueue?.startQueueProcessing() - uploadQueue?.startQueueProcessing() + NotificationCenter.default.post(name: .uploadTaskDidComplete, object: nil) } guard let taskDescriptionId = task.taskDescription, @@ -53,25 +66,27 @@ class UploadApiDelegate: NSObject, URLSessionDataDelegate, URLSessionTaskDelegat } defer { - Self.stateLock.withLock { let _ = Self.transferStates.removeValue(forKey: taskId) } + stateLock.withLock { let _ = transferStates.removeValue(forKey: taskId) } } - if let responseData = Self.stateLock.withLock({ Self.responseData.removeValue(forKey: taskId) }), - let httpResponse = task.response as? HTTPURLResponse + if let body = stateLock.withLock({ responseData.removeValue(forKey: taskId) }), + let response = task.response as? HTTPURLResponse { - switch httpResponse.statusCode { + switch response.statusCode { case 200, 201: do { - let response = try Self.jsonDecoder.decode(UploadSuccessResponse.self, from: responseData) + let response = try jsonDecoder.decode(UploadSuccessResponse.self, from: body) return await handleSuccess(taskId: taskId, response: response) } catch { return await handleFailure(taskId: taskId, code: .invalidResponse) } + case 401: return await handleFailure(taskId: taskId, code: .unauthorized) case 400..<500: - dPrint( - "Response \(httpResponse.statusCode): \(String(data: responseData, encoding: .utf8) ?? "No response body")" - ) + dPrint("Response \(response.statusCode): \(String(data: body, encoding: .utf8) ?? "No response body")") return await handleFailure(taskId: taskId, code: .badRequest) + case 500..<600: + dPrint("Response \(response.statusCode): \(String(data: body, encoding: .utf8) ?? "No response body")") + return await handleFailure(taskId: taskId, code: .internalServerError) default: break } @@ -111,8 +126,8 @@ class UploadApiDelegate: NSObject, URLSessionDataDelegate, URLSessionTaskDelegat ) { guard let sessionTaskId = task.taskDescription, let taskId = Int64(sessionTaskId) else { return } let currentTime = Date() - let state = Self.stateLock.withLock { - if let existing = Self.transferStates[taskId] { + let state = stateLock.withLock { + if let existing = transferStates[taskId] { return existing } let new = NetworkTransferState( @@ -120,7 +135,7 @@ class UploadApiDelegate: NSObject, URLSessionDataDelegate, URLSessionTaskDelegat totalBytesTransferred: totalBytesSent, currentSpeed: nil ) - Self.transferStates[taskId] = new + transferStates[taskId] = new return new } @@ -147,30 +162,29 @@ class UploadApiDelegate: NSObject, URLSessionDataDelegate, URLSessionTaskDelegat ) } + func urlSessionDidFinishEvents(forBackgroundURLSession session: URLSession) { + dPrint("All background events delivered for session: \(session.configuration.identifier ?? "unknown")") + DispatchQueue.main.async { + if let identifier = session.configuration.identifier, + let appDelegate = UIApplication.shared.delegate as? AppDelegate, + let completionHandler = appDelegate.completionHandler(forSession: identifier) + { + completionHandler() + } + } + } + private func handleSuccess(taskId: Int64, response: UploadSuccessResponse) async { dPrint("Upload succeeded for task \(taskId), server ID: \(response.id)") do { - try await db.write { conn in - let task = try UploadTask.update { $0.status = .uploadComplete }.where({ $0.id.eq(taskId) }) - .returning(\.self).fetchOne(conn) - guard let task, let isLivePhoto = task.isLivePhoto, isLivePhoto, task.livePhotoVideoId == nil else { return } - try UploadTask.insert { - UploadTask.Draft( - attempts: 0, - createdAt: Date(), - filePath: nil, - isLivePhoto: true, - lastError: nil, - livePhotoVideoId: response.id, - localId: task.localId, - method: .multipart, - priority: 0.7, - retryAfter: nil, - status: .downloadPending, - ) - }.execute(conn) - } - dPrint("Updated upload success status for session task \(taskId)") + try await taskRepository.markUploadSuccess(taskId: taskId, livePhotoVideoId: response.id) + statusListener.onTaskStatus( + UploadApiTaskStatus( + id: String(taskId), + filename: (try? await taskRepository.getFilename(taskId: taskId)) ?? "", + status: .uploadComplete + ) + ) } catch { dPrint( "Failed to update upload success status for session task \(taskId): \(error.localizedDescription)" @@ -180,10 +194,14 @@ class UploadApiDelegate: NSObject, URLSessionDataDelegate, URLSessionTaskDelegat private func handleFailure(taskId: Int64, code: UploadErrorCode = .unknown) async { dPrint("Upload failed for task \(taskId) with code \(code)") - try? await db.write { conn in - try UploadTask.retryOrFail(code: code, status: .uploadFailed).where { $0.id.eq(taskId) } - .execute(conn) - } + try? await taskRepository.retryOrFail(taskId: taskId, code: code, status: .uploadFailed) + statusListener.onTaskStatus( + UploadApiTaskStatus( + id: String(taskId), + filename: (try? await taskRepository.getFilename(taskId: taskId)) ?? "", + status: .uploadFailed + ) + ) } @available(iOS 17, *) @@ -193,16 +211,4 @@ class UploadApiDelegate: NSObject, URLSessionDataDelegate, URLSessionTaskDelegat resumeTask.taskDescription = taskDescriptionId resumeTask.resume() } - - private class NetworkTransferState { - var lastUpdateTime: Date - var totalBytesTransferred: Int64 - var currentSpeed: Double? - - init(lastUpdateTime: Date, totalBytesTransferred: Int64, currentSpeed: Double?) { - self.lastUpdateTime = lastUpdateTime - self.totalBytesTransferred = totalBytesTransferred - self.currentSpeed = currentSpeed - } - } } diff --git a/mobile/ios/Runner/Upload/DownloadQueue.swift b/mobile/ios/Runner/Upload/DownloadQueue.swift index f7fcb55800..655fb2d910 100644 --- a/mobile/ios/Runner/Upload/DownloadQueue.swift +++ b/mobile/ios/Runner/Upload/DownloadQueue.swift @@ -1,131 +1,105 @@ import CryptoKit import Photos -import SQLiteData -class DownloadQueue { - private static let resourceManager = PHAssetResourceManager.default() - private static var queueProcessingTask: Task? - private static var queueProcessingLock = NSLock() +private var queueProcessingTask: Task? +private var queueProcessingLock = NSLock() +private let resourceManager = PHAssetResourceManager.default() - private let db: DatabasePool - private let uploadQueue: UploadQueue - private let statusListener: StatusEventListener - private let progressListener: ProgressEventListener +private final class RequestRef { + var id: PHAssetResourceDataRequestID? + var lastProgressTime = Date() + var didStall = false +} + +final class DownloadQueue< + StoreRepo: StoreProtocol, + TaskRepo: TaskProtocol, + StatusListener: TaskStatusListener, + ProgressListener: TaskProgressListener +> { + private let storeRepository: StoreRepo + private let taskRepository: TaskRepo + private let statusListener: StatusListener + private let progressListener: ProgressListener + private var uploadObserver: NSObjectProtocol? + private var networkObserver: NSObjectProtocol? init( - db: DatabasePool, - uploadQueue: UploadQueue, - statusListener: StatusEventListener, - progressListener: ProgressEventListener + storeRepository: StoreRepo, + taskRepository: TaskRepo, + statusListener: StatusListener, + progressListener: ProgressListener ) { - self.db = db - self.uploadQueue = uploadQueue + self.storeRepository = storeRepository + self.taskRepository = taskRepository self.statusListener = statusListener self.progressListener = progressListener - NotificationCenter.default.addObserver(forName: .networkDidConnect, object: nil, queue: nil) { [weak self] _ in + uploadObserver = NotificationCenter.default.addObserver(forName: .uploadTaskDidComplete, object: nil, queue: nil) { + [weak self] _ in + self?.startQueueProcessing() + } + networkObserver = NotificationCenter.default.addObserver(forName: .networkDidConnect, object: nil, queue: nil) { + [weak self] _ in dPrint("Network connected") self?.startQueueProcessing() } } + deinit { + uploadObserver.map(NotificationCenter.default.removeObserver(_:)) + networkObserver.map(NotificationCenter.default.removeObserver(_:)) + } + func enqueueAssets(localIds: [String]) async throws { guard !localIds.isEmpty else { return dPrint("No assets to enqueue") } defer { startQueueProcessing() } - let candidates = try await db.read { conn in - return try LocalAsset.all - .where { asset in asset.id.in(localIds) } - .select { LocalAssetCandidate.Columns(id: $0.id, type: $0.type) } - .limit { _ in UploadTaskStat.availableSlots } - .fetchAll(conn) - } + let candidates = try await taskRepository.getBackupCandidates(ids: localIds) guard !candidates.isEmpty else { return dPrint("No candidates to enqueue") } - try await db.write { conn in - var draft = UploadTask.Draft( - attempts: 0, - createdAt: Date(), - filePath: nil, - isLivePhoto: nil, - lastError: nil, - livePhotoVideoId: nil, - localId: "", - method: .multipart, - priority: 0.5, - retryAfter: nil, - status: .downloadPending, - ) - for candidate in candidates { - draft.localId = candidate.id - draft.priority = candidate.type == .image ? 0.9 : 0.8 - try UploadTask.insert { - draft - } onConflict: { - ($0.localId, $0.livePhotoVideoId) - }.execute(conn) - } - } + try await taskRepository.enqueue(assets: candidates, imagePriority: 0.9, videoPriority: 0.8) dPrint("Enqueued \(candidates.count) assets for upload") } func startQueueProcessing() { dPrint("Starting download queue processing") - Self.queueProcessingLock.withLock { - guard Self.queueProcessingTask == nil else { return } - Self.queueProcessingTask = Task { + queueProcessingLock.withLock { + guard queueProcessingTask == nil else { return } + queueProcessingTask = Task { await startDownloads() - Self.queueProcessingLock.withLock { Self.queueProcessingTask = nil } + queueProcessingLock.withLock { queueProcessingTask = nil } } } } private func startDownloads() async { dPrint("Processing download queue") - guard NetworkMonitor.shared.isConnected else { - return dPrint("Download queue paused: network disconnected") + + guard await UIApplication.shared.applicationState != .background else { + return dPrint("Not processing downloads in background") // TODO: run in processing tasks } + guard NetworkMonitor.shared.isConnected, + let backupEnabled = try? storeRepository.get(StoreKey.enableBackup), backupEnabled, + let deviceId = try? storeRepository.get(StoreKey.deviceId) + else { return dPrint("Download queue paused: missing preconditions") } + do { - let tasks: [LocalAssetDownloadData] = try await db.read({ conn in - guard let backupEnabled = try Store.get(conn, StoreKey.enableBackup), backupEnabled else { return [] } - return try UploadTask.join(LocalAsset.all) { task, asset in task.localId.eq(asset.id) } - .where { task, asset in - asset.checksum.isNot(nil) && task.status.eq(TaskStatus.downloadPending) - && task.attempts < TaskConfig.maxRetries - && (task.retryAfter.is(nil) || task.retryAfter.unwrapped <= Date().unixTime) - && (task.lastError.is(nil) - || !task.lastError.unwrapped.in([ - UploadErrorCode.assetNotFound, UploadErrorCode.resourceNotFound, UploadErrorCode.invalidResource, - ])) - } - .select { task, asset in - LocalAssetDownloadData.Columns( - checksum: asset.checksum, - createdAt: asset.createdAt, - livePhotoVideoId: task.livePhotoVideoId, - localId: asset.id, - taskId: task.id, - updatedAt: asset.updatedAt - ) - } - .order { task, asset in (task.priority.desc(), task.createdAt) } - .limit { _, _ in UploadTaskStat.availableDownloadSlots } - .fetchAll(conn) - }) + let tasks = try await taskRepository.getDownloadTasks() if tasks.isEmpty { return dPrint("No download tasks to process") } try await withThrowingTaskGroup(of: Void.self) { group in var iterator = tasks.makeIterator() for _ in 0.. TaskConfig.downloadTimeoutS if request.didStall { if let requestId = request.id { - Self.resourceManager.cancelDataRequest(requestId) + resourceManager.cancelDataRequest(requestId) } break } @@ -271,7 +220,7 @@ class DownloadQueue { return try await withTaskCancellationHandler { try await withCheckedThrowingContinuation { continuation in var hasher = task.checksum == nil && task.livePhotoVideoId == nil ? Insecure.SHA1() : nil - request.id = Self.resourceManager.requestData( + request.id = resourceManager.requestData( for: resource, options: options, dataReceivedHandler: { data in @@ -281,7 +230,7 @@ class DownloadQueue { try fileHandle.write(contentsOf: data) } catch { request.id = nil - Self.resourceManager.cancelDataRequest(requestId) + resourceManager.cancelDataRequest(requestId) } }, completionHandler: { error in @@ -295,7 +244,7 @@ class DownloadQueue { case 81: .iCloudThrottled default: .photosUnknownError } - self.handleFailure(task: task, code: code, filePath: filePath) + continuation.resume(throwing: UploadError.iCloudError(code)) case let e as PHPhotosError: dPrint("Photos error during download: \(e)") let code: UploadErrorCode = @@ -310,10 +259,10 @@ class DownloadQueue { case .userCancelled: .cancelled default: .photosUnknownError } - self.handleFailure(task: task, code: code, filePath: filePath) + continuation.resume(throwing: UploadError.photosError(code)) case .some: dPrint("Unknown error during download: \(String(describing: error))") - self.handleFailure(task: task, code: .unknown, filePath: filePath) + continuation.resume(throwing: UploadError.unknown) case .none: dPrint("Download completed for task \(task.taskId)") do { @@ -321,7 +270,7 @@ class DownloadQueue { continuation.resume(returning: hasher.map { hasher in Data(hasher.finalize()).base64EncodedString() }) } catch { try? FileManager.default.removeItem(at: filePath) - continuation.resume(throwing: error) + continuation.resume(throwing: UploadError.fileCreationFailed) } } } @@ -329,21 +278,26 @@ class DownloadQueue { } } onCancel: { if let requestId = request.id { - Self.resourceManager.cancelDataRequest(requestId) + resourceManager.cancelDataRequest(requestId) } } } - private func handleFailure(task: LocalAssetDownloadData, code: UploadErrorCode, filePath: URL? = nil) { + private func handleFailure(task: LocalAssetDownloadData, code: UploadErrorCode, filePath: URL? = nil) async { dPrint("Handling failure for task \(task.taskId) with code \(code.rawValue)") do { if let filePath { try? FileManager.default.removeItem(at: filePath) } - try db.write { conn in - try UploadTask.retryOrFail(code: code, status: .downloadFailed).where { $0.id.eq(task.taskId) }.execute(conn) - } + try await taskRepository.retryOrFail(taskId: task.taskId, code: code, status: .downloadFailed) + statusListener.onTaskStatus( + UploadApiTaskStatus( + id: String(task.taskId), + filename: task.filename, + status: .downloadFailed + ) + ) } catch { dPrint("Failed to update download failure status for task \(task.taskId): \(error)") } diff --git a/mobile/ios/Runner/Upload/Listeners.swift b/mobile/ios/Runner/Upload/Listeners.swift index dff7e1efdc..9027f386c3 100644 --- a/mobile/ios/Runner/Upload/Listeners.swift +++ b/mobile/ios/Runner/Upload/Listeners.swift @@ -1,4 +1,12 @@ -class StatusEventListener: StreamStatusStreamHandler { +protocol TaskProgressListener { + func onTaskProgress(_ event: UploadApiTaskProgress) +} + +protocol TaskStatusListener { + func onTaskStatus(_ event: UploadApiTaskStatus) +} + +final class StatusEventListener: StreamStatusStreamHandler, TaskStatusListener, @unchecked Sendable { var eventSink: PigeonEventSink? override func onListen(withArguments arguments: Any?, sink: PigeonEventSink) { @@ -6,26 +14,7 @@ class StatusEventListener: StreamStatusStreamHandler { } func onTaskStatus(_ event: UploadApiTaskStatus) { - if let eventSink = eventSink { - eventSink.success(event) - } - } - - func onEventsDone() { - eventSink?.endOfStream() - eventSink = nil - } -} - -class ProgressEventListener: StreamProgressStreamHandler { - var eventSink: PigeonEventSink? - - override func onListen(withArguments arguments: Any?, sink: PigeonEventSink) { - eventSink = sink - } - - func onTaskProgress(_ event: UploadApiTaskProgress) { - if let eventSink = eventSink { + if let eventSink { DispatchQueue.main.async { eventSink.success(event) } } } @@ -37,3 +26,40 @@ class ProgressEventListener: StreamProgressStreamHandler { } } } + +final class ProgressEventListener: StreamProgressStreamHandler, TaskProgressListener, @unchecked Sendable { + var eventSink: PigeonEventSink? + private var lastReportTimes: [String: Date] = [:] + private let lock = NSLock() + + override func onListen(withArguments arguments: Any?, sink: PigeonEventSink) { + eventSink = sink + } + + func onTaskProgress(_ event: UploadApiTaskProgress) { + guard let eventSink, + lock.withLock({ + let now = Date() + if let lastReport = lastReportTimes[event.id] { + guard now.timeIntervalSince(lastReport) >= TaskConfig.progressThrottleInterval else { + return false + } + } + lastReportTimes[event.id] = now + return true + }) + else { return } + + DispatchQueue.main.async { eventSink.success(event) } + } + + func onEventsDone() { + DispatchQueue.main.async { + self.eventSink?.endOfStream() + self.eventSink = nil + self.lock.withLock { + self.lastReportTimes.removeAll() + } + } + } +} diff --git a/mobile/ios/Runner/Upload/NetworkMonitor.swift b/mobile/ios/Runner/Upload/NetworkMonitor.swift index 1d239beb2a..cc2776a4d9 100644 --- a/mobile/ios/Runner/Upload/NetworkMonitor.swift +++ b/mobile/ios/Runner/Upload/NetworkMonitor.swift @@ -1,6 +1,6 @@ import Network -class NetworkMonitor { +final class NetworkMonitor { static let shared = NetworkMonitor() private let monitor = NWPathMonitor() private(set) var isConnected = false @@ -17,6 +17,6 @@ class NetworkMonitor { NotificationCenter.default.post(name: .networkDidConnect, object: nil) } } - monitor.start(queue: .global(qos: .utility)) + monitor.start(queue: .global(qos: .default)) } } diff --git a/mobile/ios/Runner/Upload/UploadQueue.swift b/mobile/ios/Runner/Upload/UploadQueue.swift index fd828e2ad6..d092f1d2a7 100644 --- a/mobile/ios/Runner/Upload/UploadQueue.swift +++ b/mobile/ios/Runner/Upload/UploadQueue.swift @@ -1,28 +1,57 @@ -import SQLiteData -import StructuredFieldValues +private var queueProcessingTask: Task? +private var queueProcessingLock = NSLock() -class UploadQueue { - private static let structuredEncoder = StructuredFieldValueEncoder() - private static var queueProcessingTask: Task? - private static var queueProcessingLock = NSLock() +final class UploadQueue { + private let storeRepository: StoreRepo + private let taskRepository: TaskRepo + private let statusListener: StatusListener - private let db: DatabasePool private let cellularSession: URLSession private let wifiOnlySession: URLSession - private let statusListener: StatusEventListener + private var uploadObserver: NSObjectProtocol? + private var downloadObserver: NSObjectProtocol? + private var networkObserver: NSObjectProtocol? - init(db: DatabasePool, cellularSession: URLSession, wifiOnlySession: URLSession, statusListener: StatusEventListener) - { - self.db = db + init( + storeRepository: StoreRepo, + taskRepository: TaskRepo, + statusListener: StatusListener, + cellularSession: URLSession, + wifiOnlySession: URLSession + ) { + self.storeRepository = storeRepository + self.taskRepository = taskRepository self.cellularSession = cellularSession self.wifiOnlySession = wifiOnlySession self.statusListener = statusListener + + uploadObserver = NotificationCenter.default.addObserver(forName: .uploadTaskDidComplete, object: nil, queue: nil) { + [weak self] _ in + self?.startQueueProcessing() + } + downloadObserver = NotificationCenter.default.addObserver( + forName: .downloadTaskDidComplete, + object: nil, + queue: nil + ) { [weak self] _ in + self?.startQueueProcessing() + } + networkObserver = NotificationCenter.default.addObserver(forName: .networkDidConnect, object: nil, queue: nil) { + [weak self] _ in + self?.startQueueProcessing() + } } - + + deinit { + uploadObserver.map(NotificationCenter.default.removeObserver(_:)) + downloadObserver.map(NotificationCenter.default.removeObserver(_:)) + networkObserver.map(NotificationCenter.default.removeObserver(_:)) + } + func enqueueFiles(paths: [String]) async throws { guard !paths.isEmpty else { return dPrint("No paths to enqueue") } - guard let deviceId = (try? await db.read { conn in try Store.get(conn, StoreKey.deviceId) }) else { + guard let deviceId = try? storeRepository.get(StoreKey.deviceId) else { throw StoreError.notFound } @@ -75,72 +104,36 @@ class UploadQueue { try await group.waitForAll() } - try await db.write { conn in - var draft = UploadTask.Draft( - attempts: 0, - createdAt: Date(), - filePath: nil, - isLivePhoto: nil, - lastError: nil, - livePhotoVideoId: nil, - localId: "", - method: .multipart, - priority: 0.5, - retryAfter: nil, - status: .downloadPending, - ) - for path in paths { - draft.filePath = URL(fileURLWithPath: path, isDirectory: false) - try UploadTask.insert { draft }.execute(conn) - } - } + try await taskRepository.enqueue(files: paths) dPrint("Enqueued \(paths.count) assets for upload") } func startQueueProcessing() { dPrint("Starting upload queue processing") - Self.queueProcessingLock.withLock { - guard Self.queueProcessingTask == nil else { return } - Self.queueProcessingTask = Task { + queueProcessingLock.withLock { + guard queueProcessingTask == nil else { return } + queueProcessingTask = Task { await startUploads() - Self.queueProcessingLock.withLock { Self.queueProcessingTask = nil } + queueProcessingLock.withLock { queueProcessingTask = nil } } } } private func startUploads() async { - dPrint("Processing download queue") + dPrint("Processing upload queue") guard NetworkMonitor.shared.isConnected, - let backupEnabled = try? await db.read({ conn in try Store.get(conn, StoreKey.enableBackup) }), - backupEnabled - else { return dPrint("Download queue paused: network disconnected or backup disabled") } + let backupEnabled = try? storeRepository.get(StoreKey.enableBackup), backupEnabled, + let url = try? storeRepository.get(StoreKey.serverEndpoint), + let accessToken = try? storeRepository.get(StoreKey.accessToken) + else { return dPrint("Upload queue paused: missing preconditions") } do { - let tasks: [LocalAssetUploadData] = try await db.read({ conn in - guard let backupEnabled = try Store.get(conn, StoreKey.enableBackup), backupEnabled else { return [] } - return try UploadTask.join(LocalAsset.all) { task, asset in task.localId.eq(asset.id) } - .where { task, asset in - asset.checksum.isNot(nil) && task.status.eq(TaskStatus.uploadPending) - && task.attempts < TaskConfig.maxRetries - && task.filePath.isNot(nil) - } - .select { task, asset in - LocalAssetUploadData.Columns( - filePath: task.filePath.unwrapped, - priority: task.priority, - taskId: task.id, - type: asset.type - ) - } - .limit { task, _ in UploadTaskStat.availableUploadSlots } - .order { task, asset in (task.priority.desc(), task.createdAt) } - .fetchAll(conn) - }) + let tasks = try await taskRepository.getUploadTasks() if tasks.isEmpty { return dPrint("No upload tasks to process") } await withTaskGroup(of: Void.self) { group in for task in tasks { - group.addTask { await self.startUpload(multipart: task) } + group.addTask { await self.startUpload(multipart: task, url: url, accessToken: accessToken) } } } } catch { @@ -148,33 +141,18 @@ class UploadQueue { } } - private func startUpload(multipart task: LocalAssetUploadData) async { + private func startUpload(multipart task: LocalAssetUploadData, url: URL, accessToken: String) async { dPrint("Uploading asset resource at \(task.filePath) of task \(task.taskId)") defer { startQueueProcessing() } - let (url, accessToken, session): (URL, String, URLSession) - do { - (url, accessToken, session) = try await db.read { conn in - guard let url = try Store.get(conn, StoreKey.serverEndpoint), - let accessToken = try Store.get(conn, StoreKey.accessToken) - else { - throw StoreError.notFound - } - - let session = - switch task.type { - case .image: - (try? Store.get(conn, StoreKey.useWifiForUploadPhotos)) ?? false ? cellularSession : wifiOnlySession - case .video: - (try? Store.get(conn, StoreKey.useWifiForUploadVideos)) ?? false ? cellularSession : wifiOnlySession - default: wifiOnlySession - } - return (url, accessToken, session) + let session = + switch task.type { + case .image: + (try? storeRepository.get(StoreKey.useWifiForUploadPhotos)) ?? false ? wifiOnlySession : cellularSession + case .video: + (try? storeRepository.get(StoreKey.useWifiForUploadVideos)) ?? false ? wifiOnlySession : cellularSession + default: wifiOnlySession } - } catch { - dPrint("Upload failed for \(task.taskId), could not retrieve server URL or access token: \(error)") - return handleFailure(task: task, code: .noServerUrl) - } var request = URLRequest(url: url.appendingPathComponent("/assets")) request.httpMethod = "POST" @@ -186,18 +164,11 @@ class UploadQueue { sessionTask.priority = task.priority do { try? FileManager.default.removeItem(at: task.filePath) // upload task already copied the file - try await db.write { conn in - try UploadTask.update { row in - row.status = .uploadQueued - row.filePath = nil - } - .where { $0.id.eq(task.taskId) } - .execute(conn) - } + try await taskRepository.markUploadQueued(taskId: task.taskId) statusListener.onTaskStatus( UploadApiTaskStatus( id: String(task.taskId), - filename: task.filePath.lastPathComponent, + filename: task.filename, status: .uploadQueued, ) ) @@ -209,11 +180,16 @@ class UploadQueue { } } - private func handleFailure(task: LocalAssetUploadData, code: UploadErrorCode) { + private func handleFailure(task: LocalAssetUploadData, code: UploadErrorCode) async { do { - try db.write { conn in - try UploadTask.retryOrFail(code: code, status: .uploadFailed).where { $0.id.eq(task.taskId) }.execute(conn) - } + try await taskRepository.retryOrFail(taskId: task.taskId, code: code, status: .uploadFailed) + statusListener.onTaskStatus( + UploadApiTaskStatus( + id: String(task.taskId), + filename: task.filename, + status: .uploadFailed + ) + ) } catch { dPrint("Failed to update upload failure status for task \(task.taskId): \(error)") } diff --git a/mobile/ios/Runner/Upload/UploadTask.g.swift b/mobile/ios/Runner/Upload/UploadTask.g.swift index 5b70d9c211..a6d8906617 100644 --- a/mobile/ios/Runner/Upload/UploadTask.g.swift +++ b/mobile/ios/Runner/Upload/UploadTask.g.swift @@ -122,18 +122,19 @@ enum UploadApiErrorCode: Int { case networkError = 8 case photosInternalError = 9 case photosUnknownError = 10 - case noServerUrl = 11 - case noDeviceId = 12 - case noAccessToken = 13 - case interrupted = 14 - case cancelled = 15 - case downloadStalled = 16 - case forceQuit = 17 - case outOfResources = 18 - case backgroundUpdatesDisabled = 19 - case uploadTimeout = 20 - case iCloudRateLimit = 21 - case iCloudThrottled = 22 + case interrupted = 11 + case cancelled = 12 + case downloadStalled = 13 + case forceQuit = 14 + case outOfResources = 15 + case backgroundUpdatesDisabled = 16 + case uploadTimeout = 17 + case iCloudRateLimit = 18 + case iCloudThrottled = 19 + case invalidResponse = 20 + case badRequest = 21 + case internalServerError = 22 + case unauthorized = 23 } enum UploadApiStatus: Int { @@ -294,6 +295,7 @@ protocol UploadApi { func cancelAll(completion: @escaping (Result) -> Void) func enqueueAssets(localIds: [String], completion: @escaping (Result) -> Void) func enqueueFiles(paths: [String], completion: @escaping (Result) -> Void) + func onConfigChange(key: Int64, completion: @escaping (Result) -> Void) } /// Generated setup class from Pigeon to handle messages through the `binaryMessenger`. @@ -381,6 +383,23 @@ class UploadApiSetup { } else { enqueueFilesChannel.setMessageHandler(nil) } + let onConfigChangeChannel = FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.UploadApi.onConfigChange\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec) + if let api = api { + onConfigChangeChannel.setMessageHandler { message, reply in + let args = message as! [Any?] + let keyArg = args[0] as! Int64 + api.onConfigChange(key: keyArg) { result in + switch result { + case .success: + reply(wrapResult(nil)) + case .failure(let error): + reply(wrapError(error)) + } + } + } + } else { + onConfigChangeChannel.setMessageHandler(nil) + } } } diff --git a/mobile/ios/Runner/Upload/UploadTask.swift b/mobile/ios/Runner/Upload/UploadTask.swift index 41140c9824..6243fe285a 100644 --- a/mobile/ios/Runner/Upload/UploadTask.swift +++ b/mobile/ios/Runner/Upload/UploadTask.swift @@ -1,5 +1,4 @@ import SQLiteData -import StructuredFieldValues extension FileHandle { static func createOrOverwrite(atPath path: String) throws -> FileHandle { @@ -11,33 +10,41 @@ extension FileHandle { } } -class UploadApiImpl: ImmichPlugin, UploadApi { - private let db: DatabasePool - private let downloadQueue: DownloadQueue - private let uploadQueue: UploadQueue +final class UploadApiImpl< + StoreRepo: StoreProtocol, + TaskRepo: TaskProtocol, + StatusListener: TaskStatusListener, + ProgressListener: TaskProgressListener +>: ImmichPlugin, UploadApi { + private let storeRepository: StoreRepo + private let taskRepository: TaskRepo + private let downloadQueue: DownloadQueue + private let uploadQueue: UploadQueue private var isInitialized = false private let initLock = NSLock() private var backupTask: Task? private let backupLock = NSLock() - private let cellularSession: URLSession private let wifiOnlySession: URLSession - init(statusListener: StatusEventListener, progressListener: ProgressEventListener) { - let dbUrl = try! FileManager.default.url( - for: .documentDirectory, - in: .userDomainMask, - appropriateFor: nil, - create: true - ).appendingPathComponent("immich.sqlite") - - self.db = try! DatabasePool(path: dbUrl.path) + init( + storeRepository: StoreRepo, + taskRepository: TaskRepo, + statusListener: StatusListener, + progressListener: ProgressListener + ) { + self.taskRepository = taskRepository + let delegate = UploadApiDelegate( + taskRepository: taskRepository, + statusListener: statusListener, + progressListener: progressListener + ) let cellularConfig = URLSessionConfiguration.background(withIdentifier: "\(TaskConfig.sessionId).cellular") cellularConfig.allowsCellularAccess = true cellularConfig.waitsForConnectivity = true - let delegate = UploadApiDelegate(db: db, statusListener: statusListener, progressListener: progressListener) + self.cellularSession = URLSession(configuration: cellularConfig, delegate: delegate, delegateQueue: nil) let wifiOnlyConfig = URLSessionConfiguration.background(withIdentifier: "\(TaskConfig.sessionId).wifi") @@ -45,28 +52,26 @@ class UploadApiImpl: ImmichPlugin, UploadApi { wifiOnlyConfig.waitsForConnectivity = true self.wifiOnlySession = URLSession(configuration: wifiOnlyConfig, delegate: delegate, delegateQueue: nil) + self.storeRepository = storeRepository self.uploadQueue = UploadQueue( - db: db, + storeRepository: storeRepository, + taskRepository: taskRepository, + statusListener: statusListener, cellularSession: cellularSession, - wifiOnlySession: wifiOnlySession, - statusListener: statusListener + wifiOnlySession: wifiOnlySession ) self.downloadQueue = DownloadQueue( - db: db, - uploadQueue: uploadQueue, + storeRepository: storeRepository, + taskRepository: taskRepository, statusListener: statusListener, progressListener: progressListener ) - delegate.downloadQueue = downloadQueue - delegate.uploadQueue = uploadQueue } func initialize(completion: @escaping (Result) -> Void) { Task(priority: .high) { do { - async let dbIds = db.read { conn in - try UploadTask.select(\.id).where { $0.status.eq(TaskStatus.uploadQueued) }.fetchAll(conn) - } + async let dbIds = taskRepository.getTaskIds(status: .uploadQueued) async let cellularTasks = cellularSession.allTasks async let wifiTasks = wifiOnlySession.allTasks @@ -84,15 +89,7 @@ class UploadApiImpl: ImmichPlugin, UploadApi { validateTasks(await cellularTasks) validateTasks(await wifiTasks) - let orphanIds = Array(dbTaskIds) - try await db.write { conn in - try UploadTask.update { - $0.filePath = nil - $0.status = .downloadPending - } - .where { row in row.status.in([TaskStatus.downloadQueued, TaskStatus.uploadPending]) || row.id.in(orphanIds) } - .execute(conn) - } + try await taskRepository.markOrphansPending(ids: Array(dbTaskIds)) try? FileManager.default.removeItem(at: TaskConfig.originalsDir) initLock.withLock { isInitialized = true } @@ -139,9 +136,9 @@ class UploadApiImpl: ImmichPlugin, UploadApi { Task { do { try await downloadQueue.enqueueAssets(localIds: localIds) - completion(.success(())) + self.completeWhenActive(for: completion, with: .success(())) } catch { - completion(.failure(error)) + self.completeWhenActive(for: completion, with: .failure(error)) } } } @@ -150,13 +147,24 @@ class UploadApiImpl: ImmichPlugin, UploadApi { Task { do { try await uploadQueue.enqueueFiles(paths: paths) - completion(.success(())) + self.completeWhenActive(for: completion, with: .success(())) } catch { - completion(.failure(error)) + self.completeWhenActive(for: completion, with: .failure(error)) } } } + func onConfigChange(key: Int64, completion: @escaping (Result) -> Void) { + storeRepository.invalidateCache() + Task { + if let key = StoreKey(rawValue: Int(key)), key == ._accessToken { + try? await taskRepository.resolveError(code: .unauthorized) + } + startBackup() + self.completeWhenActive(for: completion, with: .success(())) + } + } + private func cancelSessionTasks(_ tasks: [URLSessionTask]) { dPrint("Canceling \(tasks.count) tasks") for task in tasks { @@ -165,107 +173,20 @@ class UploadApiImpl: ImmichPlugin, UploadApi { } private func _startBackup() async { - defer { downloadQueue.startQueueProcessing() } + defer { + downloadQueue.startQueueProcessing() + uploadQueue.startQueueProcessing() + } + do { - let candidates = try await db.read { conn in - return try LocalAsset.getCandidates() - .where { asset in !UploadTask.where { task in task.localId.eq(asset.id) }.exists() } - .select { LocalAssetCandidate.Columns(id: $0.id, type: $0.type) } - .limit { _ in UploadTaskStat.availableSlots } - .fetchAll(conn) - } + let candidates = try await taskRepository.getBackupCandidates() guard !candidates.isEmpty else { return dPrint("No candidates for backup") } - try await db.write { conn in - var draft = UploadTask.Draft( - attempts: 0, - createdAt: Date(), - filePath: nil, - isLivePhoto: nil, - lastError: nil, - livePhotoVideoId: nil, - localId: "", - method: .multipart, - priority: 0.5, - retryAfter: nil, - status: .downloadPending, - ) - for candidate in candidates { - draft.localId = candidate.id - draft.priority = candidate.type == .image ? 0.5 : 0.3 - try UploadTask.insert { - draft - } onConflict: { - ($0.localId, $0.livePhotoVideoId) - } - .execute(conn) - } - } + try await taskRepository.enqueue(assets: candidates, imagePriority: 0.5, videoPriority: 0.3) dPrint("Backup enqueued \(candidates.count) assets for upload") } catch { print("Backup queue error: \(error)") } } } - -struct AssetData: StructuredFieldValue { - static let structuredFieldType: StructuredFieldType = .dictionary - - let deviceAssetId: String - let deviceId: String - let fileCreatedAt: String - let fileModifiedAt: String - let fileName: String - let isFavorite: Bool - let livePhotoVideoId: String? - - static let boundary = "Boundary-\(UUID().uuidString)" - static let deviceAssetIdField = "--\(boundary)\r\nContent-Disposition: form-data; name=\"deviceAssetId\"\r\n\r\n" - .data(using: .utf8)! - static let deviceIdField = "\r\n--\(boundary)\r\nContent-Disposition: form-data; name=\"deviceId\"\r\n\r\n" - .data(using: .utf8)! - static let fileCreatedAtField = - "\r\n--\(boundary)\r\nContent-Disposition: form-data; name=\"fileCreatedAt\"\r\n\r\n" - .data(using: .utf8)! - static let fileModifiedAtField = - "\r\n--\(boundary)\r\nContent-Disposition: form-data; name=\"fileModifiedAt\"\r\n\r\n" - .data(using: .utf8)! - static let isFavoriteField = "\r\n--\(boundary)\r\nContent-Disposition: form-data; name=\"isFavorite\"\r\n\r\n" - .data(using: .utf8)! - static let livePhotoVideoIdField = - "\r\n--\(boundary)\r\nContent-Disposition: form-data; name=\"livePhotoVideoId\"\r\n\r\n" - .data(using: .utf8)! - static let trueData = "true".data(using: .utf8)! - static let falseData = "false".data(using: .utf8)! - static let footer = "\r\n--\(boundary)--\r\n".data(using: .utf8)! - static let contentType = "multipart/form-data; boundary=\(boundary)" - - func multipart() -> (Data, Data) { - var header = Data() - header.append(Self.deviceAssetIdField) - header.append(deviceAssetId.data(using: .utf8)!) - - header.append(Self.deviceIdField) - header.append(deviceId.data(using: .utf8)!) - - header.append(Self.fileCreatedAtField) - header.append(fileCreatedAt.data(using: .utf8)!) - - header.append(Self.fileModifiedAtField) - header.append(fileModifiedAt.data(using: .utf8)!) - - header.append(Self.isFavoriteField) - header.append(isFavorite ? Self.trueData : Self.falseData) - - if let livePhotoVideoId { - header.append(Self.livePhotoVideoIdField) - header.append(livePhotoVideoId.data(using: .utf8)!) - } - header.append( - "\r\n--\(Self.boundary)\r\nContent-Disposition: form-data; name=\"assetData\"; filename=\"\(fileName)\"\r\nContent-Type: application/octet-stream\r\n\r\n" - .data(using: .utf8)! - ) - return (header, Self.footer) - } -} diff --git a/mobile/lib/infrastructure/repositories/store.repository.dart b/mobile/lib/infrastructure/repositories/store.repository.dart index d4e34a02f5..dbcd8b5762 100644 --- a/mobile/lib/infrastructure/repositories/store.repository.dart +++ b/mobile/lib/infrastructure/repositories/store.repository.dart @@ -5,6 +5,7 @@ import 'package:immich_mobile/infrastructure/entities/store.entity.dart'; import 'package:immich_mobile/infrastructure/entities/store.entity.drift.dart'; import 'package:immich_mobile/infrastructure/repositories/db.repository.dart'; import 'package:immich_mobile/infrastructure/repositories/user.repository.dart'; +import 'package:immich_mobile/providers/infrastructure/platform.provider.dart'; import 'package:isar/isar.dart'; // Temporary interface until Isar is removed to make the service work with both Isar and Sqlite @@ -141,6 +142,7 @@ class DriftStoreRepository extends DriftDatabaseRepository implements IStoreRepo @override Future upsert(StoreKey key, T value) async { await _db.storeEntity.insertOnConflictUpdate(await _fromValue(key, value)); + await uploadApi.onConfigChange(key.id); return true; } diff --git a/mobile/lib/platform/upload_api.g.dart b/mobile/lib/platform/upload_api.g.dart index 47755d22d2..85d4cbf71c 100644 --- a/mobile/lib/platform/upload_api.g.dart +++ b/mobile/lib/platform/upload_api.g.dart @@ -41,9 +41,6 @@ enum UploadApiErrorCode { networkError, photosInternalError, photosUnknownError, - noServerUrl, - noDeviceId, - noAccessToken, interrupted, cancelled, downloadStalled, @@ -53,6 +50,10 @@ enum UploadApiErrorCode { uploadTimeout, iCloudRateLimit, iCloudThrottled, + invalidResponse, + badRequest, + internalServerError, + unauthorized, } enum UploadApiStatus { @@ -339,6 +340,29 @@ class UploadApi { return; } } + + Future onConfigChange(int key) async { + final String pigeonVar_channelName = + 'dev.flutter.pigeon.immich_mobile.UploadApi.onConfigChange$pigeonVar_messageChannelSuffix'; + final BasicMessageChannel pigeonVar_channel = BasicMessageChannel( + pigeonVar_channelName, + pigeonChannelCodec, + binaryMessenger: pigeonVar_binaryMessenger, + ); + final Future pigeonVar_sendFuture = pigeonVar_channel.send([key]); + final List? pigeonVar_replyList = await pigeonVar_sendFuture as List?; + if (pigeonVar_replyList == null) { + throw _createConnectionError(pigeonVar_channelName); + } else if (pigeonVar_replyList.length > 1) { + throw PlatformException( + code: pigeonVar_replyList[0]! as String, + message: pigeonVar_replyList[1] as String?, + details: pigeonVar_replyList[2], + ); + } else { + return; + } + } } Stream streamStatus({String instanceName = ''}) { diff --git a/mobile/pigeon/upload_api.dart b/mobile/pigeon/upload_api.dart index 8d26003d00..46b9d988a4 100644 --- a/mobile/pigeon/upload_api.dart +++ b/mobile/pigeon/upload_api.dart @@ -12,9 +12,6 @@ enum UploadApiErrorCode { networkError("Network error"), photosInternalError("Apple Photos internal error"), photosUnknownError("Apple Photos unknown error"), - noServerUrl("Server URL is not set"), - noDeviceId("Device ID is not set"), - noAccessToken("Access token is not set"), interrupted("Upload interrupted"), cancelled("Upload cancelled"), downloadStalled("Download stalled"), @@ -23,7 +20,11 @@ enum UploadApiErrorCode { backgroundUpdatesDisabled("Background updates are disabled"), uploadTimeout("Upload timed out"), iCloudRateLimit("iCloud rate limit reached"), - iCloudThrottled("iCloud requests are being throttled"); + iCloudThrottled("iCloud requests are being throttled"), + invalidResponse("Invalid response from server"), + badRequest("Server rejected the upload request"), + internalServerError("Internal server error"), + unauthorized("Unauthorized access"); final String message; @@ -87,6 +88,9 @@ abstract class UploadApi { @async void enqueueFiles(List paths); + + @async + void onConfigChange(int key); } @EventChannelApi()