background upload plugin

add schemas

sync variants

formatting

initial implementation

use existing db, wip

move to separate folder

fix table definitions

wip

wiring it up

repository pattern
pull/23700/head
mertalev 2025-11-22 11:09:34 -05:00
parent 41f013387f
commit 92bc22620b
No known key found for this signature in database
GPG Key ID: DF6ABC77AAD98C95
19 changed files with 939 additions and 582 deletions

View File

@ -90,18 +90,19 @@ enum class UploadApiErrorCode(val raw: Int) {
NETWORK_ERROR(8), NETWORK_ERROR(8),
PHOTOS_INTERNAL_ERROR(9), PHOTOS_INTERNAL_ERROR(9),
PHOTOS_UNKNOWN_ERROR(10), PHOTOS_UNKNOWN_ERROR(10),
NO_SERVER_URL(11), INTERRUPTED(11),
NO_DEVICE_ID(12), CANCELLED(12),
NO_ACCESS_TOKEN(13), DOWNLOAD_STALLED(13),
INTERRUPTED(14), FORCE_QUIT(14),
CANCELLED(15), OUT_OF_RESOURCES(15),
DOWNLOAD_STALLED(16), BACKGROUND_UPDATES_DISABLED(16),
FORCE_QUIT(17), UPLOAD_TIMEOUT(17),
OUT_OF_RESOURCES(18), I_CLOUD_RATE_LIMIT(18),
BACKGROUND_UPDATES_DISABLED(19), I_CLOUD_THROTTLED(19),
UPLOAD_TIMEOUT(20), INVALID_RESPONSE(20),
I_CLOUD_RATE_LIMIT(21), BAD_REQUEST(21),
I_CLOUD_THROTTLED(22); INTERNAL_SERVER_ERROR(22),
UNAUTHORIZED(23);
companion object { companion object {
fun ofRaw(raw: Int): UploadApiErrorCode? { fun ofRaw(raw: Int): UploadApiErrorCode? {
@ -262,6 +263,7 @@ interface UploadApi {
fun cancelAll(callback: (Result<Unit>) -> Unit) fun cancelAll(callback: (Result<Unit>) -> Unit)
fun enqueueAssets(localIds: List<String>, callback: (Result<Unit>) -> Unit) fun enqueueAssets(localIds: List<String>, callback: (Result<Unit>) -> Unit)
fun enqueueFiles(paths: List<String>, callback: (Result<Unit>) -> Unit) fun enqueueFiles(paths: List<String>, callback: (Result<Unit>) -> Unit)
fun onConfigChange(key: Long, callback: (Result<Unit>) -> Unit)
companion object { companion object {
/** The codec used by UploadApi. */ /** The codec used by UploadApi. */
@ -361,6 +363,25 @@ interface UploadApi {
channel.setMessageHandler(null) channel.setMessageHandler(null)
} }
} }
run {
val channel = BasicMessageChannel<Any?>(binaryMessenger, "dev.flutter.pigeon.immich_mobile.UploadApi.onConfigChange$separatedMessageChannelSuffix", codec)
if (api != null) {
channel.setMessageHandler { message, reply ->
val args = message as List<Any?>
val keyArg = args[0] as Long
api.onConfigChange(keyArg) { result: Result<Unit> ->
val error = result.exceptionOrNull()
if (error != null) {
reply.reply(UploadTaskPigeonUtils.wrapError(error))
} else {
reply.reply(UploadTaskPigeonUtils.wrapResult(null))
}
}
}
} else {
channel.setMessageHandler(null)
}
}
} }
} }
} }

View File

@ -167,6 +167,11 @@
path = Upload; path = Upload;
sourceTree = "<group>"; sourceTree = "<group>";
}; };
FEA74CE22ED223690014C832 /* Repositories */ = {
isa = PBXFileSystemSynchronizedRootGroup;
path = Repositories;
sourceTree = "<group>";
};
FEB3BA112EBD52860081A5EB /* Schemas */ = { FEB3BA112EBD52860081A5EB /* Schemas */ = {
isa = PBXFileSystemSynchronizedRootGroup; isa = PBXFileSystemSynchronizedRootGroup;
path = Schemas; path = Schemas;
@ -281,6 +286,7 @@
97C146F01CF9000F007C117D /* Runner */ = { 97C146F01CF9000F007C117D /* Runner */ = {
isa = PBXGroup; isa = PBXGroup;
children = ( children = (
FEA74CE22ED223690014C832 /* Repositories */,
FE14355D2EC446E90009D5AC /* Upload */, FE14355D2EC446E90009D5AC /* Upload */,
FEE084F22EC172080045228E /* Schemas */, FEE084F22EC172080045228E /* Schemas */,
FEB3BA112EBD52860081A5EB /* Schemas */, FEB3BA112EBD52860081A5EB /* Schemas */,
@ -373,6 +379,7 @@
B231F52D2E93A44A00BC45D1 /* Core */, B231F52D2E93A44A00BC45D1 /* Core */,
B2CF7F8C2DDE4EBB00744BF6 /* Sync */, B2CF7F8C2DDE4EBB00744BF6 /* Sync */,
FE14355D2EC446E90009D5AC /* Upload */, FE14355D2EC446E90009D5AC /* Upload */,
FEA74CE22ED223690014C832 /* Repositories */,
FEB3BA112EBD52860081A5EB /* Schemas */, FEB3BA112EBD52860081A5EB /* Schemas */,
FEE084F22EC172080045228E /* Schemas */, FEE084F22EC172080045228E /* Schemas */,
); );

View File

@ -1,5 +1,6 @@
import BackgroundTasks import BackgroundTasks
import Flutter import Flutter
import SQLiteData
import UIKit import UIKit
import network_info_plus import network_info_plus
import path_provider_foundation import path_provider_foundation
@ -9,6 +10,8 @@ import shared_preferences_foundation
@main @main
@objc class AppDelegate: FlutterAppDelegate { @objc class AppDelegate: FlutterAppDelegate {
private var backgroundCompletionHandlers: [String: () -> Void] = [:]
override func application( override func application(
_ application: UIApplication, _ application: UIApplication,
didFinishLaunchingWithOptions launchOptions: [UIApplication.LaunchOptionsKey: Any]? didFinishLaunchingWithOptions launchOptions: [UIApplication.LaunchOptionsKey: Any]?
@ -53,18 +56,46 @@ import shared_preferences_foundation
return super.application(application, didFinishLaunchingWithOptions: launchOptions) return super.application(application, didFinishLaunchingWithOptions: launchOptions)
} }
override func application(
_ application: UIApplication,
handleEventsForBackgroundURLSession identifier: String,
completionHandler: @escaping () -> Void
) {
backgroundCompletionHandlers[identifier] = completionHandler
}
func completionHandler(forSession identifier: String) -> (() -> Void)? {
return backgroundCompletionHandlers.removeValue(forKey: identifier)
}
public static func registerPlugins(with engine: FlutterEngine) { public static func registerPlugins(with engine: FlutterEngine) {
NativeSyncApiImpl.register(with: engine.registrar(forPlugin: NativeSyncApiImpl.name)!) NativeSyncApiImpl.register(with: engine.registrar(forPlugin: NativeSyncApiImpl.name)!)
ThumbnailApiSetup.setUp(binaryMessenger: engine.binaryMessenger, api: ThumbnailApiImpl()) ThumbnailApiSetup.setUp(binaryMessenger: engine.binaryMessenger, api: ThumbnailApiImpl())
BackgroundWorkerFgHostApiSetup.setUp(binaryMessenger: engine.binaryMessenger, api: BackgroundWorkerApiImpl()) BackgroundWorkerFgHostApiSetup.setUp(binaryMessenger: engine.binaryMessenger, api: BackgroundWorkerApiImpl())
let statusListener = StatusEventListener() let statusListener = StatusEventListener()
StreamStatusStreamHandler.register(with: engine.binaryMessenger, streamHandler: statusListener) StreamStatusStreamHandler.register(with: engine.binaryMessenger, streamHandler: statusListener)
let progressListener = ProgressEventListener() let progressListener = ProgressEventListener()
StreamProgressStreamHandler.register(with: engine.binaryMessenger, streamHandler: progressListener) StreamProgressStreamHandler.register(with: engine.binaryMessenger, streamHandler: progressListener)
let dbUrl = try! FileManager.default.url(
for: .documentDirectory,
in: .userDomainMask,
appropriateFor: nil,
create: true
).appendingPathComponent("immich.sqlite")
let db = try! DatabasePool(path: dbUrl.path)
let storeRepository = StoreRepository(db: db)
let taskRepository = TaskRepository(db: db)
UploadApiSetup.setUp( UploadApiSetup.setUp(
binaryMessenger: engine.binaryMessenger, binaryMessenger: engine.binaryMessenger,
api: UploadApiImpl(statusListener: statusListener, progressListener: progressListener) api: UploadApiImpl(
storeRepository: storeRepository,
taskRepository: taskRepository,
statusListener: statusListener,
progressListener: progressListener
)
) )
} }

View File

@ -29,15 +29,15 @@ class BackgroundWorkerApiImpl: BackgroundWorkerFgHostApi {
public static func registerBackgroundWorkers() { public static func registerBackgroundWorkers() {
BGTaskScheduler.shared.register( BGTaskScheduler.shared.register(
forTaskWithIdentifier: processingTaskID, using: nil) { task in forTaskWithIdentifier: processingTaskID, using: nil) { task in
if task is BGProcessingTask { if case let task as BGProcessingTask = task {
handleBackgroundProcessing(task: task as! BGProcessingTask) handleBackgroundProcessing(task: task)
} }
} }
BGTaskScheduler.shared.register( BGTaskScheduler.shared.register(
forTaskWithIdentifier: refreshTaskID, using: nil) { task in forTaskWithIdentifier: refreshTaskID, using: nil) { task in
if task is BGAppRefreshTask { if case let task as BGAppRefreshTask = task {
handleBackgroundRefresh(task: task as! BGAppRefreshTask) handleBackgroundRefresh(task: task)
} }
} }
} }

View File

@ -1,5 +1,86 @@
import SQLiteData import SQLiteData
protocol StoreProtocol {
func get<T: StoreConvertible<Int>>(_ key: StoreKey.Typed<T>) throws -> T?
func get<T: StoreConvertible<String>>(_ key: StoreKey.Typed<T>) throws -> T?
func set<T: StoreConvertible<Int>>(_ key: StoreKey.Typed<T>, value: T) throws
func set<T: StoreConvertible<String>>(_ key: StoreKey.Typed<T>, value: T) throws
func invalidateCache()
}
protocol StoreConvertible<StorageType> {
associatedtype StorageType
static var cacheKeyPath: ReferenceWritableKeyPath<StoreCache, [StoreKey: Self]> { get }
static func fromValue(_ value: StorageType) throws(StoreError) -> Self
static func toValue(_ value: Self) throws(StoreError) -> StorageType
}
final class StoreRepository: StoreProtocol {
private let db: DatabasePool
private static let cache = StoreCache()
private static var lock = os_unfair_lock()
init(db: DatabasePool) {
self.db = db
}
func get<T: StoreConvertible<Int>>(_ key: StoreKey.Typed<T>) throws -> T? {
os_unfair_lock_lock(&Self.lock)
defer { os_unfair_lock_unlock(&Self.lock) }
let cached = Self.cache.get(key)
if _fastPath(cached != nil) { return cached! }
return try db.read { conn in
let query = Store.select(\.intValue).where { $0.id.eq(key.rawValue) }
if let value = try query.fetchOne(conn) ?? nil {
let converted = try T.fromValue(value)
Self.cache.set(key, value: converted)
return converted
}
return nil
}
}
func get<T: StoreConvertible<String>>(_ key: StoreKey.Typed<T>) throws -> T? {
os_unfair_lock_lock(&Self.lock)
defer { os_unfair_lock_unlock(&Self.lock) }
let cached = Self.cache.get(key)
if _fastPath(cached != nil) { return cached! }
return try db.read { conn in
let query = Store.select(\.stringValue).where { $0.id.eq(key.rawValue) }
if let value = try query.fetchOne(conn) ?? nil {
let converted = try T.fromValue(value)
Self.cache.set(key, value: converted)
return converted
}
return nil
}
}
func set<T: StoreConvertible<Int>>(_ key: StoreKey.Typed<T>, value: T) throws {
os_unfair_lock_lock(&Self.lock)
defer { os_unfair_lock_unlock(&Self.lock) }
let converted = try T.toValue(value)
try db.write { conn in
try Store.upsert { Store(id: key.rawValue, stringValue: nil, intValue: converted) }.execute(conn)
}
Self.cache.set(key, value: value)
}
func set<T: StoreConvertible<String>>(_ key: StoreKey.Typed<T>, value: T) throws {
os_unfair_lock_lock(&Self.lock)
defer { os_unfair_lock_unlock(&Self.lock) }
let converted = try T.toValue(value)
try db.write { conn in
try Store.upsert { Store(id: key.rawValue, stringValue: converted, intValue: nil) }.execute(conn)
}
Self.cache.set(key, value: value)
}
func invalidateCache() {
Self.cache.reset()
}
}
enum StoreError: Error { enum StoreError: Error {
case invalidJSON(String) case invalidJSON(String)
case invalidURL(String) case invalidURL(String)
@ -7,29 +88,21 @@ enum StoreError: Error {
case notFound case notFound
} }
protocol StoreConvertible {
static var cacheKeyPath: ReferenceWritableKeyPath<StoreCache, [StoreKey: Self]> { get }
associatedtype StorageType
static func fromValue(_ value: StorageType) throws(StoreError) -> Self
static func toValue(_ value: Self) throws(StoreError) -> StorageType
}
extension StoreConvertible { extension StoreConvertible {
static func get(_ cache: StoreCache, key: StoreKey) -> Self? { fileprivate static func get(_ cache: StoreCache, key: StoreKey) -> Self? {
os_unfair_lock_lock(&cache.lock)
defer { os_unfair_lock_unlock(&cache.lock) }
return cache[keyPath: cacheKeyPath][key] return cache[keyPath: cacheKeyPath][key]
} }
static func set(_ cache: StoreCache, key: StoreKey, value: Self?) { fileprivate static func set(_ cache: StoreCache, key: StoreKey, value: Self?) {
os_unfair_lock_lock(&cache.lock)
defer { os_unfair_lock_unlock(&cache.lock) }
cache[keyPath: cacheKeyPath][key] = value cache[keyPath: cacheKeyPath][key] = value
} }
fileprivate static func reset(_ cache: StoreCache) {
cache.reset()
}
} }
final class StoreCache { final class StoreCache {
fileprivate var lock = os_unfair_lock()
fileprivate var intCache: [StoreKey: Int] = [:] fileprivate var intCache: [StoreKey: Int] = [:]
fileprivate var boolCache: [StoreKey: Bool] = [:] fileprivate var boolCache: [StoreKey: Bool] = [:]
fileprivate var dateCache: [StoreKey: Date] = [:] fileprivate var dateCache: [StoreKey: Date] = [:]
@ -39,11 +112,21 @@ final class StoreCache {
fileprivate var stringDictCache: [StoreKey: [String: String]] = [:] fileprivate var stringDictCache: [StoreKey: [String: String]] = [:]
func get<T: StoreConvertible>(_ key: StoreKey.Typed<T>) -> T? { func get<T: StoreConvertible>(_ key: StoreKey.Typed<T>) -> T? {
T.get(self, key: key.rawValue) return T.get(self, key: key.rawValue)
} }
func set<T: StoreConvertible>(_ key: StoreKey.Typed<T>, value: T?) { func set<T: StoreConvertible>(_ key: StoreKey.Typed<T>, value: T?) {
T.set(self, key: key.rawValue, value: value) return T.set(self, key: key.rawValue, value: value)
}
func reset() {
intCache.removeAll(keepingCapacity: true)
boolCache.removeAll(keepingCapacity: true)
dateCache.removeAll(keepingCapacity: true)
stringCache.removeAll(keepingCapacity: true)
urlCache.removeAll(keepingCapacity: true)
endpointArrayCache.removeAll(keepingCapacity: true)
stringDictCache.removeAll(keepingCapacity: true)
} }
} }
@ -82,7 +165,7 @@ extension URL: StoreConvertible {
static func toValue(_ value: URL) -> String { value.absoluteString } static func toValue(_ value: URL) -> String { value.absoluteString }
} }
extension StoreConvertible where Self: Codable, StorageType == String { extension StoreConvertible<String> where Self: Codable {
static var jsonDecoder: JSONDecoder { JSONDecoder() } static var jsonDecoder: JSONDecoder { JSONDecoder() }
static var jsonEncoder: JSONEncoder { JSONEncoder() } static var jsonEncoder: JSONEncoder { JSONEncoder() }
@ -118,43 +201,3 @@ extension Dictionary: StoreConvertible where Key == String, Value == String {
static let cacheKeyPath = \StoreCache.stringDictCache static let cacheKeyPath = \StoreCache.stringDictCache
typealias StorageType = String typealias StorageType = String
} }
extension Store {
static let cache = StoreCache()
static func get<T: StoreConvertible>(_ conn: Database, _ key: StoreKey.Typed<T>) throws -> T?
where T.StorageType == Int {
if let cached = cache.get(key) { return cached }
let query = Store.select(\.intValue).where { $0.id.eq(key.rawValue) }
if let value = try query.fetchOne(conn) ?? nil {
let converted = try T.fromValue(value)
cache.set(key, value: converted)
}
return nil
}
static func get<T: StoreConvertible>(_ conn: Database, _ key: StoreKey.Typed<T>) throws -> T?
where T.StorageType == String {
if let cached = cache.get(key) { return cached }
let query = Store.select(\.stringValue).where { $0.id.eq(key.rawValue) }
if let value = try query.fetchOne(conn) ?? nil {
let converted = try T.fromValue(value)
cache.set(key, value: converted)
}
return nil
}
static func set<T: StoreConvertible>(_ conn: Database, _ key: StoreKey.Typed<T>, value: T) throws
where T.StorageType == Int {
let converted = try T.toValue(value)
try Store.upsert { Store(id: key.rawValue, stringValue: nil, intValue: converted) }.execute(conn)
cache.set(key, value: value)
}
static func set<T: StoreConvertible>(_ conn: Database, _ key: StoreKey.Typed<T>, value: T) throws
where T.StorageType == String {
let converted = try T.toValue(value)
try Store.upsert { Store(id: key.rawValue, stringValue: converted, intValue: nil) }.execute(conn)
cache.set(key, value: value)
}
}

View File

@ -0,0 +1,279 @@
import SQLiteData
protocol TaskProtocol {
func getTaskIds(status: TaskStatus) async throws -> [Int64]
func getBackupCandidates() async throws -> [LocalAssetCandidate]
func getBackupCandidates(ids: [String]) async throws -> [LocalAssetCandidate]
func getDownloadTasks() async throws -> [LocalAssetDownloadData]
func getUploadTasks() async throws -> [LocalAssetUploadData]
func markOrphansPending(ids: [Int64]) async throws
func markDownloadQueued(taskId: Int64, isLivePhoto: Bool, filePath: URL) async throws
func markUploadQueued(taskId: Int64) async throws
func markDownloadComplete(taskId: Int64, localId: String, hash: String?) async throws -> TaskStatus
func markUploadSuccess(taskId: Int64, livePhotoVideoId: String?) async throws
func retryOrFail(taskId: Int64, code: UploadErrorCode, status: TaskStatus) async throws
func enqueue(assets: [LocalAssetCandidate], imagePriority: Float, videoPriority: Float) async throws
func enqueue(files: [String]) async throws
func resolveError(code: UploadErrorCode) async throws
func getFilename(taskId: Int64) async throws -> String?
}
final class TaskRepository: TaskProtocol {
private let db: DatabasePool
init(db: DatabasePool) {
self.db = db
}
func getTaskIds(status: TaskStatus) async throws -> [Int64] {
return try await db.read { conn in
try UploadTask.select(\.id).where { $0.status.eq(status) }.fetchAll(conn)
}
}
func getBackupCandidates() async throws -> [LocalAssetCandidate] {
return try await db.read { conn in
return try LocalAsset.backupCandidates.fetchAll(conn)
}
}
func getBackupCandidates(ids: [String]) async throws -> [LocalAssetCandidate] {
return try await db.read { conn in
return try LocalAsset.backupCandidates.where { $0.id.in(ids) }.fetchAll(conn)
}
}
func getDownloadTasks() async throws -> [LocalAssetDownloadData] {
return try await db.read({ conn in
return try UploadTask.join(LocalAsset.all) { task, asset in task.localId.eq(asset.id) }
.where { task, _ in task.canRetry && task.noFatalError && LocalAsset.withChecksum.exists() }
.select { task, asset in
LocalAssetDownloadData.Columns(
checksum: asset.checksum,
createdAt: asset.createdAt,
filename: asset.name,
livePhotoVideoId: task.livePhotoVideoId,
localId: asset.id,
taskId: task.id,
updatedAt: asset.updatedAt
)
}
.order { task, asset in (task.priority.desc(), task.createdAt) }
.limit { _, _ in UploadTaskStat.availableDownloadSlots }
.fetchAll(conn)
})
}
func getUploadTasks() async throws -> [LocalAssetUploadData] {
return try await db.read({ conn in
return try UploadTask.join(LocalAsset.all) { task, asset in task.localId.eq(asset.id) }
.where { task, _ in task.canRetry && task.noFatalError && LocalAsset.withChecksum.exists() }
.select { task, asset in
LocalAssetUploadData.Columns(
filename: asset.name,
filePath: task.filePath.unwrapped,
priority: task.priority,
taskId: task.id,
type: asset.type
)
}
.order { task, asset in (task.priority.desc(), task.createdAt) }
.limit { task, _ in UploadTaskStat.availableUploadSlots }
.fetchAll(conn)
})
}
func markOrphansPending(ids: [Int64]) async throws {
try await db.write { conn in
try UploadTask.update {
$0.filePath = nil
$0.status = .downloadPending
}
.where { row in row.status.in([TaskStatus.downloadQueued, TaskStatus.uploadPending]) || row.id.in(ids) }
.execute(conn)
}
}
func markDownloadQueued(taskId: Int64, isLivePhoto: Bool, filePath: URL) async throws {
try await db.write { conn in
try UploadTask.update {
$0.status = .downloadQueued
$0.isLivePhoto = isLivePhoto
$0.filePath = filePath
}
.where { $0.id.eq(taskId) }.execute(conn)
}
}
func markUploadQueued(taskId: Int64) async throws {
try await db.write { conn in
try UploadTask.update { row in
row.status = .uploadQueued
row.filePath = nil
}
.where { $0.id.eq(taskId) }.execute(conn)
}
}
func markDownloadComplete(taskId: Int64, localId: String, hash: String?) async throws -> TaskStatus {
return try await db.write { conn in
if let hash {
try LocalAsset.update { $0.checksum = hash }.where { $0.id.eq(localId) }.execute(conn)
}
let status =
if let hash, try RemoteAsset.select(\.rowid).where({ $0.checksum.eq(hash) }).fetchOne(conn) != nil {
TaskStatus.uploadSkipped
} else {
TaskStatus.uploadPending
}
try UploadTask.update { $0.status = status }.where { $0.id.eq(taskId) }.execute(conn)
return status
}
}
func markUploadSuccess(taskId: Int64, livePhotoVideoId: String?) async throws {
try await db.write { conn in
let task =
try UploadTask
.update { $0.status = .uploadComplete }
.where { $0.id.eq(taskId) }
.returning(\.self)
.fetchOne(conn)
guard let task, let localId = task.localId, let isLivePhoto = task.isLivePhoto, isLivePhoto,
task.livePhotoVideoId == nil
else { return }
try UploadTask.insert {
UploadTask.Draft(
attempts: 0,
createdAt: Date(),
filePath: nil,
isLivePhoto: true,
lastError: nil,
livePhotoVideoId: livePhotoVideoId,
localId: localId,
method: .multipart,
priority: 0.7,
retryAfter: nil,
status: .downloadPending,
)
}.execute(conn)
}
}
func retryOrFail(taskId: Int64, code: UploadErrorCode, status: TaskStatus) async throws {
try await db.write { conn in
try UploadTask.update { row in
let retryOffset =
switch code {
case .iCloudThrottled, .iCloudRateLimit, .notEnoughSpace: 3000
default: 0
}
row.status = Case()
.when(row.localId.is(nil) && row.attempts.lte(TaskConfig.maxRetries), then: TaskStatus.uploadPending)
.when(row.attempts.lte(TaskConfig.maxRetries), then: TaskStatus.downloadPending)
.else(status)
row.attempts += 1
row.lastError = code
row.retryAfter = #sql("unixepoch('now') + (\(4 << row.attempts)) + \(retryOffset)")
}
.where { $0.id.eq(taskId) }.execute(conn)
}
}
func enqueue(assets: [LocalAssetCandidate], imagePriority: Float, videoPriority: Float) async throws {
try await db.write { conn in
var draft = draftStub
for candidate in assets {
draft.localId = candidate.id
draft.priority = candidate.type == .image ? imagePriority : videoPriority
try UploadTask.insert {
draft
} onConflict: {
($0.localId, $0.livePhotoVideoId)
}
.execute(conn)
}
}
}
func enqueue(files: [String]) async throws {
try await db.write { conn in
var draft = draftStub
draft.priority = 1.0
draft.status = .uploadPending
for file in files {
draft.filePath = URL(fileURLWithPath: file, isDirectory: false)
try UploadTask.insert { draft }.execute(conn)
}
}
}
func resolveError(code: UploadErrorCode) async throws {
try await db.write { conn in
try UploadTask.update { $0.lastError = nil }.where { $0.lastError.unwrapped.eq(code) }.execute(conn)
}
}
func getFilename(taskId: Int64) async throws -> String? {
try await db.read { conn in
try UploadTask.join(LocalAsset.all) { task, asset in task.localId.eq(asset.id) }.select(\.1.name).fetchOne(conn)
}
}
private var draftStub: UploadTask.Draft {
.init(
attempts: 0,
createdAt: Date(),
filePath: nil,
isLivePhoto: nil,
lastError: nil,
livePhotoVideoId: nil,
localId: nil,
method: .multipart,
priority: 0.5,
retryAfter: nil,
status: .downloadPending,
)
}
}
extension UploadTask.TableColumns {
var noFatalError: some QueryExpression<Bool> { lastError.is(nil) || !lastError.unwrapped.in(UploadErrorCode.fatal) }
var canRetry: some QueryExpression<Bool> {
attempts.lte(TaskConfig.maxRetries) && (retryAfter.is(nil) || retryAfter.unwrapped <= Date().unixTime)
}
}
extension LocalAlbum {
static let selected = Self.where { $0.backupSelection.eq(BackupSelection.selected) }
static let excluded = Self.where { $0.backupSelection.eq(BackupSelection.excluded) }
}
extension LocalAlbumAsset {
static let selected = Self.where {
$0.id.assetId.eq(LocalAsset.columns.id) && $0.id.albumId.in(LocalAlbum.selected.select(\.id))
}
static let excluded = Self.where {
$0.id.assetId.eq(LocalAsset.columns.id) && $0.id.albumId.in(LocalAlbum.excluded.select(\.id))
}
}
extension RemoteAsset {
static let currentUser = Self.where { _ in
ownerId.eq(Store.select(\.stringValue).where { $0.id.eq(StoreKey.currentUser.rawValue) }.unwrapped)
}
}
extension LocalAsset {
static let withChecksum = Self.where { $0.checksum.isNot(nil) }
static let shouldBackup = Self.where { _ in LocalAlbumAsset.selected.exists() && !LocalAlbumAsset.excluded.exists() }
static let notBackedUp = Self.where { local in
!RemoteAsset.currentUser.where { remote in local.checksum.eq(remote.checksum) }.exists()
}
static let backupCandidates = Self
.shouldBackup
.notBackedUp
.where { local in !UploadTask.where { $0.localId.eq(local.id) }.exists() }
.select { LocalAssetCandidate.Columns(id: $0.id, type: $0.type) }
.limit { _ in UploadTaskStat.availableSlots }
}

View File

@ -2,6 +2,8 @@ import SQLiteData
extension Notification.Name { extension Notification.Name {
static let networkDidConnect = Notification.Name("networkDidConnect") static let networkDidConnect = Notification.Name("networkDidConnect")
static let downloadTaskDidComplete = Notification.Name("downloadTaskDidComplete")
static let uploadTaskDidComplete = Notification.Name("uploadTaskDidComplete")
} }
enum TaskConfig { enum TaskConfig {
@ -12,6 +14,7 @@ enum TaskConfig {
static let sessionId = "app.mertalev.immich.upload" static let sessionId = "app.mertalev.immich.upload"
static let downloadCheckIntervalNs: UInt64 = 30_000_000_000 // 30 seconds static let downloadCheckIntervalNs: UInt64 = 30_000_000_000 // 30 seconds
static let downloadTimeoutS = TimeInterval(60) static let downloadTimeoutS = TimeInterval(60)
static let progressThrottleInterval = TimeInterval(0.1)
static let transferSpeedAlpha = 0.4 static let transferSpeedAlpha = 0.4
static let originalsDir = FileManager.default.temporaryDirectory.appendingPathComponent( static let originalsDir = FileManager.default.temporaryDirectory.appendingPathComponent(
"originals", "originals",
@ -214,6 +217,18 @@ enum UploadError: Error {
case fileCreationFailed case fileCreationFailed
case iCloudError(UploadErrorCode) case iCloudError(UploadErrorCode)
case photosError(UploadErrorCode) case photosError(UploadErrorCode)
case unknown
var code: UploadErrorCode {
switch self {
case .iCloudError(let code), .photosError(let code):
return code
case .unknown:
return .unknown
case .fileCreationFailed:
return .writeFailed
}
}
} }
enum UploadErrorCode: Int, QueryBindable { enum UploadErrorCode: Int, QueryBindable {
@ -228,9 +243,6 @@ enum UploadErrorCode: Int, QueryBindable {
case networkError case networkError
case photosInternalError case photosInternalError
case photosUnknownError case photosUnknownError
case noServerUrl
case noDeviceId
case noAccessToken
case interrupted case interrupted
case cancelled case cancelled
case downloadStalled case downloadStalled
@ -243,6 +255,9 @@ enum UploadErrorCode: Int, QueryBindable {
case invalidResponse case invalidResponse
case badRequest case badRequest
case internalServerError case internalServerError
case unauthorized
static let fatal: [UploadErrorCode] = [.assetNotFound, .resourceNotFound, .invalidResource, .badRequest, .unauthorized]
} }
enum AssetType: Int, QueryBindable { enum AssetType: Int, QueryBindable {

View File

@ -73,11 +73,6 @@ struct LocalAlbum: Identifiable {
let updatedAt: Date let updatedAt: Date
} }
extension LocalAlbum {
static let selected = Self.where { $0.backupSelection.eq(BackupSelection.selected) }
static let excluded = Self.where { $0.backupSelection.eq(BackupSelection.excluded) }
}
@Table("local_album_asset_entity") @Table("local_album_asset_entity")
struct LocalAlbumAsset { struct LocalAlbumAsset {
let id: ID let id: ID
@ -93,15 +88,6 @@ struct LocalAlbumAsset {
} }
} }
extension LocalAlbumAsset {
static let selected = Self.where {
$0.id.assetId.eq(LocalAsset.columns.id) && $0.id.albumId.in(LocalAlbum.selected.select(\.id))
}
static let excluded = Self.where {
$0.id.assetId.eq(LocalAsset.columns.id) && $0.id.albumId.in(LocalAlbum.excluded.select(\.id))
}
}
@Table("local_asset_entity") @Table("local_asset_entity")
struct LocalAsset: Identifiable { struct LocalAsset: Identifiable {
let id: String let id: String
@ -119,18 +105,6 @@ struct LocalAsset: Identifiable {
@Column("updated_at") @Column("updated_at")
let updatedAt: String let updatedAt: String
let width: Int? let width: Int?
static func getCandidates() -> Where<LocalAsset> {
return Self.where { local in
LocalAlbumAsset.selected.exists()
&& !LocalAlbumAsset.excluded.exists()
&& !RemoteAsset.where {
local.checksum.eq($0.checksum)
&& $0.ownerId.eq(Store.select(\.stringValue).where { $0.id.eq(StoreKey.currentUser.rawValue) }.unwrapped)
}.exists()
&& !UploadTask.where { $0.localId.eq(local.id) }.exists()
}
}
} }
@Selection @Selection
@ -143,6 +117,7 @@ struct LocalAssetCandidate {
struct LocalAssetDownloadData { struct LocalAssetDownloadData {
let checksum: String? let checksum: String?
let createdAt: String let createdAt: String
let filename: String
let livePhotoVideoId: RemoteAsset.ID? let livePhotoVideoId: RemoteAsset.ID?
let localId: LocalAsset.ID let localId: LocalAsset.ID
let taskId: UploadTask.ID let taskId: UploadTask.ID
@ -151,6 +126,7 @@ struct LocalAssetDownloadData {
@Selection @Selection
struct LocalAssetUploadData { struct LocalAssetUploadData {
let filename: String
let filePath: URL let filePath: URL
let priority: Float let priority: Float
let taskId: UploadTask.ID let taskId: UploadTask.ID
@ -375,16 +351,7 @@ struct UploadTask: Identifiable {
var priority: Float var priority: Float
@Column("retry_after", as: Date?.UnixTimeRepresentation.self) @Column("retry_after", as: Date?.UnixTimeRepresentation.self)
let retryAfter: Date? let retryAfter: Date?
let status: TaskStatus var status: TaskStatus
static func retryOrFail(code: UploadErrorCode, status: TaskStatus) -> Update<UploadTask, ()> {
return Self.update { row in
row.status = Case().when(row.attempts.lte(TaskConfig.maxRetries), then: TaskStatus.downloadPending).else(status)
row.attempts += 1
row.lastError = code
row.retryAfter = #sql("unixepoch('now') + (\(4 << row.attempts))")
}
}
} }
@Table("upload_task_stats") @Table("upload_task_stats")

View File

@ -0,0 +1,62 @@
import StructuredFieldValues
struct AssetData: StructuredFieldValue {
static let structuredFieldType: StructuredFieldType = .dictionary
let deviceAssetId: String
let deviceId: String
let fileCreatedAt: String
let fileModifiedAt: String
let fileName: String
let isFavorite: Bool
let livePhotoVideoId: String?
static let boundary = "Boundary-\(UUID().uuidString)"
static let deviceAssetIdField = "--\(boundary)\r\nContent-Disposition: form-data; name=\"deviceAssetId\"\r\n\r\n"
.data(using: .utf8)!
static let deviceIdField = "\r\n--\(boundary)\r\nContent-Disposition: form-data; name=\"deviceId\"\r\n\r\n"
.data(using: .utf8)!
static let fileCreatedAtField =
"\r\n--\(boundary)\r\nContent-Disposition: form-data; name=\"fileCreatedAt\"\r\n\r\n"
.data(using: .utf8)!
static let fileModifiedAtField =
"\r\n--\(boundary)\r\nContent-Disposition: form-data; name=\"fileModifiedAt\"\r\n\r\n"
.data(using: .utf8)!
static let isFavoriteField = "\r\n--\(boundary)\r\nContent-Disposition: form-data; name=\"isFavorite\"\r\n\r\n"
.data(using: .utf8)!
static let livePhotoVideoIdField =
"\r\n--\(boundary)\r\nContent-Disposition: form-data; name=\"livePhotoVideoId\"\r\n\r\n"
.data(using: .utf8)!
static let trueData = "true".data(using: .utf8)!
static let falseData = "false".data(using: .utf8)!
static let footer = "\r\n--\(boundary)--\r\n".data(using: .utf8)!
static let contentType = "multipart/form-data; boundary=\(boundary)"
func multipart() -> (Data, Data) {
var header = Data()
header.append(Self.deviceAssetIdField)
header.append(deviceAssetId.data(using: .utf8)!)
header.append(Self.deviceIdField)
header.append(deviceId.data(using: .utf8)!)
header.append(Self.fileCreatedAtField)
header.append(fileCreatedAt.data(using: .utf8)!)
header.append(Self.fileModifiedAtField)
header.append(fileModifiedAt.data(using: .utf8)!)
header.append(Self.isFavoriteField)
header.append(isFavorite ? Self.trueData : Self.falseData)
if let livePhotoVideoId {
header.append(Self.livePhotoVideoIdField)
header.append(livePhotoVideoId.data(using: .utf8)!)
}
header.append(
"\r\n--\(Self.boundary)\r\nContent-Disposition: form-data; name=\"assetData\"; filename=\"\(fileName)\"\r\nContent-Type: application/octet-stream\r\n\r\n"
.data(using: .utf8)!
)
return (header, Self.footer)
}
}

View File

@ -1,19 +1,33 @@
import SQLiteData import SQLiteData
class UploadApiDelegate: NSObject, URLSessionDataDelegate, URLSessionTaskDelegate { private let stateLock = NSLock()
private static let stateLock = NSLock() private var transferStates: [Int64: NetworkTransferState] = [:]
private static var transferStates: [Int64: NetworkTransferState] = [:] private var responseData: [Int64: Data] = [:]
private static var responseData: [Int64: Data] = [:] private let jsonDecoder = JSONDecoder()
private static let jsonDecoder = JSONDecoder()
private let db: DatabasePool private class NetworkTransferState {
private let statusListener: StatusEventListener var lastUpdateTime: Date
private let progressListener: ProgressEventListener var totalBytesTransferred: Int64
weak var downloadQueue: DownloadQueue? var currentSpeed: Double?
weak var uploadQueue: UploadQueue?
init(db: DatabasePool, statusListener: StatusEventListener, progressListener: ProgressEventListener) { init(lastUpdateTime: Date, totalBytesTransferred: Int64, currentSpeed: Double?) {
self.db = db self.lastUpdateTime = lastUpdateTime
self.totalBytesTransferred = totalBytesTransferred
self.currentSpeed = currentSpeed
}
}
final class UploadApiDelegate<
TaskRepo: TaskProtocol,
StatusListener: TaskStatusListener,
ProgressListener: TaskProgressListener
>: NSObject, URLSessionDataDelegate, URLSessionTaskDelegate {
private let taskRepository: TaskRepo
private let statusListener: StatusListener
private let progressListener: ProgressListener
init(taskRepository: TaskRepo, statusListener: StatusListener, progressListener: ProgressListener) {
self.taskRepository = taskRepository
self.statusListener = statusListener self.statusListener = statusListener
self.progressListener = progressListener self.progressListener = progressListener
} }
@ -30,11 +44,11 @@ class UploadApiDelegate: NSObject, URLSessionDataDelegate, URLSessionTaskDelegat
let taskId = Int64(taskIdStr) let taskId = Int64(taskIdStr)
else { return } else { return }
Self.stateLock.withLock { stateLock.withLock {
if var response = Self.responseData[taskId] { if var response = responseData[taskId] {
response.append(data) response.append(data)
} else { } else {
Self.responseData[taskId] = data responseData[taskId] = data
} }
} }
} }
@ -42,8 +56,7 @@ class UploadApiDelegate: NSObject, URLSessionDataDelegate, URLSessionTaskDelegat
func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) { func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
Task { Task {
defer { defer {
downloadQueue?.startQueueProcessing() NotificationCenter.default.post(name: .uploadTaskDidComplete, object: nil)
uploadQueue?.startQueueProcessing()
} }
guard let taskDescriptionId = task.taskDescription, guard let taskDescriptionId = task.taskDescription,
@ -53,25 +66,27 @@ class UploadApiDelegate: NSObject, URLSessionDataDelegate, URLSessionTaskDelegat
} }
defer { defer {
Self.stateLock.withLock { let _ = Self.transferStates.removeValue(forKey: taskId) } stateLock.withLock { let _ = transferStates.removeValue(forKey: taskId) }
} }
if let responseData = Self.stateLock.withLock({ Self.responseData.removeValue(forKey: taskId) }), if let body = stateLock.withLock({ responseData.removeValue(forKey: taskId) }),
let httpResponse = task.response as? HTTPURLResponse let response = task.response as? HTTPURLResponse
{ {
switch httpResponse.statusCode { switch response.statusCode {
case 200, 201: case 200, 201:
do { do {
let response = try Self.jsonDecoder.decode(UploadSuccessResponse.self, from: responseData) let response = try jsonDecoder.decode(UploadSuccessResponse.self, from: body)
return await handleSuccess(taskId: taskId, response: response) return await handleSuccess(taskId: taskId, response: response)
} catch { } catch {
return await handleFailure(taskId: taskId, code: .invalidResponse) return await handleFailure(taskId: taskId, code: .invalidResponse)
} }
case 401: return await handleFailure(taskId: taskId, code: .unauthorized)
case 400..<500: case 400..<500:
dPrint( dPrint("Response \(response.statusCode): \(String(data: body, encoding: .utf8) ?? "No response body")")
"Response \(httpResponse.statusCode): \(String(data: responseData, encoding: .utf8) ?? "No response body")"
)
return await handleFailure(taskId: taskId, code: .badRequest) return await handleFailure(taskId: taskId, code: .badRequest)
case 500..<600:
dPrint("Response \(response.statusCode): \(String(data: body, encoding: .utf8) ?? "No response body")")
return await handleFailure(taskId: taskId, code: .internalServerError)
default: default:
break break
} }
@ -111,8 +126,8 @@ class UploadApiDelegate: NSObject, URLSessionDataDelegate, URLSessionTaskDelegat
) { ) {
guard let sessionTaskId = task.taskDescription, let taskId = Int64(sessionTaskId) else { return } guard let sessionTaskId = task.taskDescription, let taskId = Int64(sessionTaskId) else { return }
let currentTime = Date() let currentTime = Date()
let state = Self.stateLock.withLock { let state = stateLock.withLock {
if let existing = Self.transferStates[taskId] { if let existing = transferStates[taskId] {
return existing return existing
} }
let new = NetworkTransferState( let new = NetworkTransferState(
@ -120,7 +135,7 @@ class UploadApiDelegate: NSObject, URLSessionDataDelegate, URLSessionTaskDelegat
totalBytesTransferred: totalBytesSent, totalBytesTransferred: totalBytesSent,
currentSpeed: nil currentSpeed: nil
) )
Self.transferStates[taskId] = new transferStates[taskId] = new
return new return new
} }
@ -147,30 +162,29 @@ class UploadApiDelegate: NSObject, URLSessionDataDelegate, URLSessionTaskDelegat
) )
} }
func urlSessionDidFinishEvents(forBackgroundURLSession session: URLSession) {
dPrint("All background events delivered for session: \(session.configuration.identifier ?? "unknown")")
DispatchQueue.main.async {
if let identifier = session.configuration.identifier,
let appDelegate = UIApplication.shared.delegate as? AppDelegate,
let completionHandler = appDelegate.completionHandler(forSession: identifier)
{
completionHandler()
}
}
}
private func handleSuccess(taskId: Int64, response: UploadSuccessResponse) async { private func handleSuccess(taskId: Int64, response: UploadSuccessResponse) async {
dPrint("Upload succeeded for task \(taskId), server ID: \(response.id)") dPrint("Upload succeeded for task \(taskId), server ID: \(response.id)")
do { do {
try await db.write { conn in try await taskRepository.markUploadSuccess(taskId: taskId, livePhotoVideoId: response.id)
let task = try UploadTask.update { $0.status = .uploadComplete }.where({ $0.id.eq(taskId) }) statusListener.onTaskStatus(
.returning(\.self).fetchOne(conn) UploadApiTaskStatus(
guard let task, let isLivePhoto = task.isLivePhoto, isLivePhoto, task.livePhotoVideoId == nil else { return } id: String(taskId),
try UploadTask.insert { filename: (try? await taskRepository.getFilename(taskId: taskId)) ?? "",
UploadTask.Draft( status: .uploadComplete
attempts: 0, )
createdAt: Date(), )
filePath: nil,
isLivePhoto: true,
lastError: nil,
livePhotoVideoId: response.id,
localId: task.localId,
method: .multipart,
priority: 0.7,
retryAfter: nil,
status: .downloadPending,
)
}.execute(conn)
}
dPrint("Updated upload success status for session task \(taskId)")
} catch { } catch {
dPrint( dPrint(
"Failed to update upload success status for session task \(taskId): \(error.localizedDescription)" "Failed to update upload success status for session task \(taskId): \(error.localizedDescription)"
@ -180,10 +194,14 @@ class UploadApiDelegate: NSObject, URLSessionDataDelegate, URLSessionTaskDelegat
private func handleFailure(taskId: Int64, code: UploadErrorCode = .unknown) async { private func handleFailure(taskId: Int64, code: UploadErrorCode = .unknown) async {
dPrint("Upload failed for task \(taskId) with code \(code)") dPrint("Upload failed for task \(taskId) with code \(code)")
try? await db.write { conn in try? await taskRepository.retryOrFail(taskId: taskId, code: code, status: .uploadFailed)
try UploadTask.retryOrFail(code: code, status: .uploadFailed).where { $0.id.eq(taskId) } statusListener.onTaskStatus(
.execute(conn) UploadApiTaskStatus(
} id: String(taskId),
filename: (try? await taskRepository.getFilename(taskId: taskId)) ?? "",
status: .uploadFailed
)
)
} }
@available(iOS 17, *) @available(iOS 17, *)
@ -193,16 +211,4 @@ class UploadApiDelegate: NSObject, URLSessionDataDelegate, URLSessionTaskDelegat
resumeTask.taskDescription = taskDescriptionId resumeTask.taskDescription = taskDescriptionId
resumeTask.resume() resumeTask.resume()
} }
private class NetworkTransferState {
var lastUpdateTime: Date
var totalBytesTransferred: Int64
var currentSpeed: Double?
init(lastUpdateTime: Date, totalBytesTransferred: Int64, currentSpeed: Double?) {
self.lastUpdateTime = lastUpdateTime
self.totalBytesTransferred = totalBytesTransferred
self.currentSpeed = currentSpeed
}
}
} }

View File

@ -1,131 +1,105 @@
import CryptoKit import CryptoKit
import Photos import Photos
import SQLiteData
class DownloadQueue { private var queueProcessingTask: Task<Void, Never>?
private static let resourceManager = PHAssetResourceManager.default() private var queueProcessingLock = NSLock()
private static var queueProcessingTask: Task<Void, Never>? private let resourceManager = PHAssetResourceManager.default()
private static var queueProcessingLock = NSLock()
private let db: DatabasePool private final class RequestRef {
private let uploadQueue: UploadQueue var id: PHAssetResourceDataRequestID?
private let statusListener: StatusEventListener var lastProgressTime = Date()
private let progressListener: ProgressEventListener var didStall = false
}
final class DownloadQueue<
StoreRepo: StoreProtocol,
TaskRepo: TaskProtocol,
StatusListener: TaskStatusListener,
ProgressListener: TaskProgressListener
> {
private let storeRepository: StoreRepo
private let taskRepository: TaskRepo
private let statusListener: StatusListener
private let progressListener: ProgressListener
private var uploadObserver: NSObjectProtocol?
private var networkObserver: NSObjectProtocol?
init( init(
db: DatabasePool, storeRepository: StoreRepo,
uploadQueue: UploadQueue, taskRepository: TaskRepo,
statusListener: StatusEventListener, statusListener: StatusListener,
progressListener: ProgressEventListener progressListener: ProgressListener
) { ) {
self.db = db self.storeRepository = storeRepository
self.uploadQueue = uploadQueue self.taskRepository = taskRepository
self.statusListener = statusListener self.statusListener = statusListener
self.progressListener = progressListener self.progressListener = progressListener
NotificationCenter.default.addObserver(forName: .networkDidConnect, object: nil, queue: nil) { [weak self] _ in uploadObserver = NotificationCenter.default.addObserver(forName: .uploadTaskDidComplete, object: nil, queue: nil) {
[weak self] _ in
self?.startQueueProcessing()
}
networkObserver = NotificationCenter.default.addObserver(forName: .networkDidConnect, object: nil, queue: nil) {
[weak self] _ in
dPrint("Network connected") dPrint("Network connected")
self?.startQueueProcessing() self?.startQueueProcessing()
} }
} }
deinit {
uploadObserver.map(NotificationCenter.default.removeObserver(_:))
networkObserver.map(NotificationCenter.default.removeObserver(_:))
}
func enqueueAssets(localIds: [String]) async throws { func enqueueAssets(localIds: [String]) async throws {
guard !localIds.isEmpty else { return dPrint("No assets to enqueue") } guard !localIds.isEmpty else { return dPrint("No assets to enqueue") }
defer { startQueueProcessing() } defer { startQueueProcessing() }
let candidates = try await db.read { conn in let candidates = try await taskRepository.getBackupCandidates(ids: localIds)
return try LocalAsset.all
.where { asset in asset.id.in(localIds) }
.select { LocalAssetCandidate.Columns(id: $0.id, type: $0.type) }
.limit { _ in UploadTaskStat.availableSlots }
.fetchAll(conn)
}
guard !candidates.isEmpty else { return dPrint("No candidates to enqueue") } guard !candidates.isEmpty else { return dPrint("No candidates to enqueue") }
try await db.write { conn in try await taskRepository.enqueue(assets: candidates, imagePriority: 0.9, videoPriority: 0.8)
var draft = UploadTask.Draft(
attempts: 0,
createdAt: Date(),
filePath: nil,
isLivePhoto: nil,
lastError: nil,
livePhotoVideoId: nil,
localId: "",
method: .multipart,
priority: 0.5,
retryAfter: nil,
status: .downloadPending,
)
for candidate in candidates {
draft.localId = candidate.id
draft.priority = candidate.type == .image ? 0.9 : 0.8
try UploadTask.insert {
draft
} onConflict: {
($0.localId, $0.livePhotoVideoId)
}.execute(conn)
}
}
dPrint("Enqueued \(candidates.count) assets for upload") dPrint("Enqueued \(candidates.count) assets for upload")
} }
func startQueueProcessing() { func startQueueProcessing() {
dPrint("Starting download queue processing") dPrint("Starting download queue processing")
Self.queueProcessingLock.withLock { queueProcessingLock.withLock {
guard Self.queueProcessingTask == nil else { return } guard queueProcessingTask == nil else { return }
Self.queueProcessingTask = Task { queueProcessingTask = Task {
await startDownloads() await startDownloads()
Self.queueProcessingLock.withLock { Self.queueProcessingTask = nil } queueProcessingLock.withLock { queueProcessingTask = nil }
} }
} }
} }
private func startDownloads() async { private func startDownloads() async {
dPrint("Processing download queue") dPrint("Processing download queue")
guard NetworkMonitor.shared.isConnected else {
return dPrint("Download queue paused: network disconnected") guard await UIApplication.shared.applicationState != .background else {
return dPrint("Not processing downloads in background") // TODO: run in processing tasks
} }
guard NetworkMonitor.shared.isConnected,
let backupEnabled = try? storeRepository.get(StoreKey.enableBackup), backupEnabled,
let deviceId = try? storeRepository.get(StoreKey.deviceId)
else { return dPrint("Download queue paused: missing preconditions") }
do { do {
let tasks: [LocalAssetDownloadData] = try await db.read({ conn in let tasks = try await taskRepository.getDownloadTasks()
guard let backupEnabled = try Store.get(conn, StoreKey.enableBackup), backupEnabled else { return [] }
return try UploadTask.join(LocalAsset.all) { task, asset in task.localId.eq(asset.id) }
.where { task, asset in
asset.checksum.isNot(nil) && task.status.eq(TaskStatus.downloadPending)
&& task.attempts < TaskConfig.maxRetries
&& (task.retryAfter.is(nil) || task.retryAfter.unwrapped <= Date().unixTime)
&& (task.lastError.is(nil)
|| !task.lastError.unwrapped.in([
UploadErrorCode.assetNotFound, UploadErrorCode.resourceNotFound, UploadErrorCode.invalidResource,
]))
}
.select { task, asset in
LocalAssetDownloadData.Columns(
checksum: asset.checksum,
createdAt: asset.createdAt,
livePhotoVideoId: task.livePhotoVideoId,
localId: asset.id,
taskId: task.id,
updatedAt: asset.updatedAt
)
}
.order { task, asset in (task.priority.desc(), task.createdAt) }
.limit { _, _ in UploadTaskStat.availableDownloadSlots }
.fetchAll(conn)
})
if tasks.isEmpty { return dPrint("No download tasks to process") } if tasks.isEmpty { return dPrint("No download tasks to process") }
try await withThrowingTaskGroup(of: Void.self) { group in try await withThrowingTaskGroup(of: Void.self) { group in
var iterator = tasks.makeIterator() var iterator = tasks.makeIterator()
for _ in 0..<min(TaskConfig.maxActiveDownloads, tasks.count) { for _ in 0..<min(TaskConfig.maxActiveDownloads, tasks.count) {
if let task = iterator.next() { if let task = iterator.next() {
group.addTask { await self.downloadAndQueue(task) } group.addTask { await self.downloadAndQueue(task, deviceId: deviceId) }
} }
} }
while try await group.next() != nil { while try await group.next() != nil {
if let task = iterator.next() { if let task = iterator.next() {
group.addTask { await self.downloadAndQueue(task) } group.addTask { await self.downloadAndQueue(task, deviceId: deviceId) }
} }
} }
} }
@ -134,26 +108,21 @@ class DownloadQueue {
} }
} }
private func downloadAndQueue(_ task: LocalAssetDownloadData) async { private func downloadAndQueue(_ task: LocalAssetDownloadData, deviceId: String) async {
defer { startQueueProcessing() } defer { startQueueProcessing() }
dPrint("Starting download for task \(task.taskId)") dPrint("Starting download for task \(task.taskId)")
guard let asset = PHAsset.fetchAssets(withLocalIdentifiers: [task.localId], options: nil).firstObject guard let asset = PHAsset.fetchAssets(withLocalIdentifiers: [task.localId], options: nil).firstObject
else { else {
dPrint("Asset not found") dPrint("Asset not found")
return handleFailure(task: task, code: .assetNotFound) return await handleFailure(task: task, code: .assetNotFound)
} }
let isLivePhoto = asset.mediaSubtypes.contains(.photoLive) let isLivePhoto = asset.mediaSubtypes.contains(.photoLive)
let isMotion = isLivePhoto && task.livePhotoVideoId != nil let isMotion = isLivePhoto && task.livePhotoVideoId != nil
guard let resource = isMotion ? asset.getLivePhotoResource() : asset.getResource() else { guard let resource = isMotion ? asset.getLivePhotoResource() : asset.getResource() else {
dPrint("Resource not found") dPrint("Resource not found")
return handleFailure(task: task, code: .resourceNotFound) return await handleFailure(task: task, code: .resourceNotFound)
}
guard let deviceId = (try? await db.read { conn in try Store.get(conn, StoreKey.deviceId) }) else {
dPrint("Device ID not found")
return handleFailure(task: task, code: .noDeviceId)
} }
let fileDir = TaskConfig.originalsDir let fileDir = TaskConfig.originalsDir
@ -167,17 +136,11 @@ class DownloadQueue {
) )
} catch { } catch {
dPrint("Failed to create directory for download task \(task.taskId): \(error)") dPrint("Failed to create directory for download task \(task.taskId): \(error)")
return handleFailure(task: task, code: .writeFailed, filePath: filePath) return await handleFailure(task: task, code: .writeFailed, filePath: filePath)
} }
do { do {
try await db.write { conn in try await taskRepository.markDownloadQueued(taskId: task.taskId, isLivePhoto: isLivePhoto, filePath: filePath)
try UploadTask.update {
$0.status = .downloadQueued
$0.isLivePhoto = isLivePhoto
$0.filePath = filePath
}.where { $0.id.eq(task.taskId) }.execute(conn)
}
} catch { } catch {
return dPrint("Failed to set file path for download task \(task.taskId): \(error)") return dPrint("Failed to set file path for download task \(task.taskId): \(error)")
} }
@ -187,28 +150,21 @@ class DownloadQueue {
do { do {
let hash = try await download(task: task, asset: asset, resource: resource, to: filePath, deviceId: deviceId) let hash = try await download(task: task, asset: asset, resource: resource, to: filePath, deviceId: deviceId)
let status = try await db.write { conn in let status = try await taskRepository.markDownloadComplete(taskId: task.taskId, localId: task.localId, hash: hash)
if let hash { try LocalAsset.update { $0.checksum = hash }.where { $0.id.eq(task.localId) }.execute(conn) }
let status =
if let hash, try RemoteAsset.select(\.rowid).where({ $0.checksum.eq(hash) }).fetchOne(conn) != nil {
TaskStatus.uploadSkipped
} else {
TaskStatus.uploadPending
}
try UploadTask.update { $0.status = .uploadPending }.where { $0.id.eq(task.taskId) }.execute(conn)
return status
}
statusListener.onTaskStatus( statusListener.onTaskStatus(
UploadApiTaskStatus( UploadApiTaskStatus(
id: String(task.taskId), id: String(task.taskId),
filename: filePath.path, filename: task.filename,
status: UploadApiStatus(rawValue: status.rawValue)! status: UploadApiStatus(rawValue: status.rawValue)!
) )
) )
uploadQueue.startQueueProcessing() NotificationCenter.default.post(name: .downloadTaskDidComplete, object: nil)
} catch let error as UploadError {
dPrint("Download failed for task \(task.taskId): \(error)")
await handleFailure(task: task, code: error.code, filePath: filePath)
} catch { } catch {
dPrint("Download failed for task \(task.taskId): \(error)") dPrint("Download failed for task \(task.taskId): \(error)")
handleFailure(task: task, code: .writeFailed, filePath: filePath) await handleFailure(task: task, code: .unknown, filePath: filePath)
} }
} }
@ -240,18 +196,11 @@ class DownloadQueue {
} }
try fileHandle.write(contentsOf: header) try fileHandle.write(contentsOf: header)
class RequestRef {
var id: PHAssetResourceDataRequestID?
var lastProgressTime = Date()
var didStall = false
}
var lastProgressTime = Date() var lastProgressTime = Date()
nonisolated(unsafe) let progressListener = self.progressListener
let taskIdStr = String(task.taskId) let taskIdStr = String(task.taskId)
options.progressHandler = { progress in options.progressHandler = { progress in
lastProgressTime = Date() lastProgressTime = Date()
progressListener.onTaskProgress(UploadApiTaskProgress(id: taskIdStr, progress: progress)) self.progressListener.onTaskProgress(UploadApiTaskProgress(id: taskIdStr, progress: progress))
} }
let request = RequestRef() let request = RequestRef()
@ -261,7 +210,7 @@ class DownloadQueue {
request.didStall = Date().timeIntervalSince(lastProgressTime) > TaskConfig.downloadTimeoutS request.didStall = Date().timeIntervalSince(lastProgressTime) > TaskConfig.downloadTimeoutS
if request.didStall { if request.didStall {
if let requestId = request.id { if let requestId = request.id {
Self.resourceManager.cancelDataRequest(requestId) resourceManager.cancelDataRequest(requestId)
} }
break break
} }
@ -271,7 +220,7 @@ class DownloadQueue {
return try await withTaskCancellationHandler { return try await withTaskCancellationHandler {
try await withCheckedThrowingContinuation { continuation in try await withCheckedThrowingContinuation { continuation in
var hasher = task.checksum == nil && task.livePhotoVideoId == nil ? Insecure.SHA1() : nil var hasher = task.checksum == nil && task.livePhotoVideoId == nil ? Insecure.SHA1() : nil
request.id = Self.resourceManager.requestData( request.id = resourceManager.requestData(
for: resource, for: resource,
options: options, options: options,
dataReceivedHandler: { data in dataReceivedHandler: { data in
@ -281,7 +230,7 @@ class DownloadQueue {
try fileHandle.write(contentsOf: data) try fileHandle.write(contentsOf: data)
} catch { } catch {
request.id = nil request.id = nil
Self.resourceManager.cancelDataRequest(requestId) resourceManager.cancelDataRequest(requestId)
} }
}, },
completionHandler: { error in completionHandler: { error in
@ -295,7 +244,7 @@ class DownloadQueue {
case 81: .iCloudThrottled case 81: .iCloudThrottled
default: .photosUnknownError default: .photosUnknownError
} }
self.handleFailure(task: task, code: code, filePath: filePath) continuation.resume(throwing: UploadError.iCloudError(code))
case let e as PHPhotosError: case let e as PHPhotosError:
dPrint("Photos error during download: \(e)") dPrint("Photos error during download: \(e)")
let code: UploadErrorCode = let code: UploadErrorCode =
@ -310,10 +259,10 @@ class DownloadQueue {
case .userCancelled: .cancelled case .userCancelled: .cancelled
default: .photosUnknownError default: .photosUnknownError
} }
self.handleFailure(task: task, code: code, filePath: filePath) continuation.resume(throwing: UploadError.photosError(code))
case .some: case .some:
dPrint("Unknown error during download: \(String(describing: error))") dPrint("Unknown error during download: \(String(describing: error))")
self.handleFailure(task: task, code: .unknown, filePath: filePath) continuation.resume(throwing: UploadError.unknown)
case .none: case .none:
dPrint("Download completed for task \(task.taskId)") dPrint("Download completed for task \(task.taskId)")
do { do {
@ -321,7 +270,7 @@ class DownloadQueue {
continuation.resume(returning: hasher.map { hasher in Data(hasher.finalize()).base64EncodedString() }) continuation.resume(returning: hasher.map { hasher in Data(hasher.finalize()).base64EncodedString() })
} catch { } catch {
try? FileManager.default.removeItem(at: filePath) try? FileManager.default.removeItem(at: filePath)
continuation.resume(throwing: error) continuation.resume(throwing: UploadError.fileCreationFailed)
} }
} }
} }
@ -329,21 +278,26 @@ class DownloadQueue {
} }
} onCancel: { } onCancel: {
if let requestId = request.id { if let requestId = request.id {
Self.resourceManager.cancelDataRequest(requestId) resourceManager.cancelDataRequest(requestId)
} }
} }
} }
private func handleFailure(task: LocalAssetDownloadData, code: UploadErrorCode, filePath: URL? = nil) { private func handleFailure(task: LocalAssetDownloadData, code: UploadErrorCode, filePath: URL? = nil) async {
dPrint("Handling failure for task \(task.taskId) with code \(code.rawValue)") dPrint("Handling failure for task \(task.taskId) with code \(code.rawValue)")
do { do {
if let filePath { if let filePath {
try? FileManager.default.removeItem(at: filePath) try? FileManager.default.removeItem(at: filePath)
} }
try db.write { conn in try await taskRepository.retryOrFail(taskId: task.taskId, code: code, status: .downloadFailed)
try UploadTask.retryOrFail(code: code, status: .downloadFailed).where { $0.id.eq(task.taskId) }.execute(conn) statusListener.onTaskStatus(
} UploadApiTaskStatus(
id: String(task.taskId),
filename: task.filename,
status: .downloadFailed
)
)
} catch { } catch {
dPrint("Failed to update download failure status for task \(task.taskId): \(error)") dPrint("Failed to update download failure status for task \(task.taskId): \(error)")
} }

View File

@ -1,4 +1,12 @@
class StatusEventListener: StreamStatusStreamHandler { protocol TaskProgressListener {
func onTaskProgress(_ event: UploadApiTaskProgress)
}
protocol TaskStatusListener {
func onTaskStatus(_ event: UploadApiTaskStatus)
}
final class StatusEventListener: StreamStatusStreamHandler, TaskStatusListener, @unchecked Sendable {
var eventSink: PigeonEventSink<UploadApiTaskStatus>? var eventSink: PigeonEventSink<UploadApiTaskStatus>?
override func onListen(withArguments arguments: Any?, sink: PigeonEventSink<UploadApiTaskStatus>) { override func onListen(withArguments arguments: Any?, sink: PigeonEventSink<UploadApiTaskStatus>) {
@ -6,26 +14,7 @@ class StatusEventListener: StreamStatusStreamHandler {
} }
func onTaskStatus(_ event: UploadApiTaskStatus) { func onTaskStatus(_ event: UploadApiTaskStatus) {
if let eventSink = eventSink { if let eventSink {
eventSink.success(event)
}
}
func onEventsDone() {
eventSink?.endOfStream()
eventSink = nil
}
}
class ProgressEventListener: StreamProgressStreamHandler {
var eventSink: PigeonEventSink<UploadApiTaskProgress>?
override func onListen(withArguments arguments: Any?, sink: PigeonEventSink<UploadApiTaskProgress>) {
eventSink = sink
}
func onTaskProgress(_ event: UploadApiTaskProgress) {
if let eventSink = eventSink {
DispatchQueue.main.async { eventSink.success(event) } DispatchQueue.main.async { eventSink.success(event) }
} }
} }
@ -37,3 +26,40 @@ class ProgressEventListener: StreamProgressStreamHandler {
} }
} }
} }
final class ProgressEventListener: StreamProgressStreamHandler, TaskProgressListener, @unchecked Sendable {
var eventSink: PigeonEventSink<UploadApiTaskProgress>?
private var lastReportTimes: [String: Date] = [:]
private let lock = NSLock()
override func onListen(withArguments arguments: Any?, sink: PigeonEventSink<UploadApiTaskProgress>) {
eventSink = sink
}
func onTaskProgress(_ event: UploadApiTaskProgress) {
guard let eventSink,
lock.withLock({
let now = Date()
if let lastReport = lastReportTimes[event.id] {
guard now.timeIntervalSince(lastReport) >= TaskConfig.progressThrottleInterval else {
return false
}
}
lastReportTimes[event.id] = now
return true
})
else { return }
DispatchQueue.main.async { eventSink.success(event) }
}
func onEventsDone() {
DispatchQueue.main.async {
self.eventSink?.endOfStream()
self.eventSink = nil
self.lock.withLock {
self.lastReportTimes.removeAll()
}
}
}
}

View File

@ -1,6 +1,6 @@
import Network import Network
class NetworkMonitor { final class NetworkMonitor {
static let shared = NetworkMonitor() static let shared = NetworkMonitor()
private let monitor = NWPathMonitor() private let monitor = NWPathMonitor()
private(set) var isConnected = false private(set) var isConnected = false
@ -17,6 +17,6 @@ class NetworkMonitor {
NotificationCenter.default.post(name: .networkDidConnect, object: nil) NotificationCenter.default.post(name: .networkDidConnect, object: nil)
} }
} }
monitor.start(queue: .global(qos: .utility)) monitor.start(queue: .global(qos: .default))
} }
} }

View File

@ -1,28 +1,57 @@
import SQLiteData private var queueProcessingTask: Task<Void, Never>?
import StructuredFieldValues private var queueProcessingLock = NSLock()
class UploadQueue { final class UploadQueue<StoreRepo: StoreProtocol, TaskRepo: TaskProtocol, StatusListener: TaskStatusListener> {
private static let structuredEncoder = StructuredFieldValueEncoder() private let storeRepository: StoreRepo
private static var queueProcessingTask: Task<Void, Never>? private let taskRepository: TaskRepo
private static var queueProcessingLock = NSLock() private let statusListener: StatusListener
private let db: DatabasePool
private let cellularSession: URLSession private let cellularSession: URLSession
private let wifiOnlySession: URLSession private let wifiOnlySession: URLSession
private let statusListener: StatusEventListener private var uploadObserver: NSObjectProtocol?
private var downloadObserver: NSObjectProtocol?
private var networkObserver: NSObjectProtocol?
init(db: DatabasePool, cellularSession: URLSession, wifiOnlySession: URLSession, statusListener: StatusEventListener) init(
{ storeRepository: StoreRepo,
self.db = db taskRepository: TaskRepo,
statusListener: StatusListener,
cellularSession: URLSession,
wifiOnlySession: URLSession
) {
self.storeRepository = storeRepository
self.taskRepository = taskRepository
self.cellularSession = cellularSession self.cellularSession = cellularSession
self.wifiOnlySession = wifiOnlySession self.wifiOnlySession = wifiOnlySession
self.statusListener = statusListener self.statusListener = statusListener
uploadObserver = NotificationCenter.default.addObserver(forName: .uploadTaskDidComplete, object: nil, queue: nil) {
[weak self] _ in
self?.startQueueProcessing()
}
downloadObserver = NotificationCenter.default.addObserver(
forName: .downloadTaskDidComplete,
object: nil,
queue: nil
) { [weak self] _ in
self?.startQueueProcessing()
}
networkObserver = NotificationCenter.default.addObserver(forName: .networkDidConnect, object: nil, queue: nil) {
[weak self] _ in
self?.startQueueProcessing()
}
} }
deinit {
uploadObserver.map(NotificationCenter.default.removeObserver(_:))
downloadObserver.map(NotificationCenter.default.removeObserver(_:))
networkObserver.map(NotificationCenter.default.removeObserver(_:))
}
func enqueueFiles(paths: [String]) async throws { func enqueueFiles(paths: [String]) async throws {
guard !paths.isEmpty else { return dPrint("No paths to enqueue") } guard !paths.isEmpty else { return dPrint("No paths to enqueue") }
guard let deviceId = (try? await db.read { conn in try Store.get(conn, StoreKey.deviceId) }) else { guard let deviceId = try? storeRepository.get(StoreKey.deviceId) else {
throw StoreError.notFound throw StoreError.notFound
} }
@ -75,72 +104,36 @@ class UploadQueue {
try await group.waitForAll() try await group.waitForAll()
} }
try await db.write { conn in try await taskRepository.enqueue(files: paths)
var draft = UploadTask.Draft(
attempts: 0,
createdAt: Date(),
filePath: nil,
isLivePhoto: nil,
lastError: nil,
livePhotoVideoId: nil,
localId: "",
method: .multipart,
priority: 0.5,
retryAfter: nil,
status: .downloadPending,
)
for path in paths {
draft.filePath = URL(fileURLWithPath: path, isDirectory: false)
try UploadTask.insert { draft }.execute(conn)
}
}
dPrint("Enqueued \(paths.count) assets for upload") dPrint("Enqueued \(paths.count) assets for upload")
} }
func startQueueProcessing() { func startQueueProcessing() {
dPrint("Starting upload queue processing") dPrint("Starting upload queue processing")
Self.queueProcessingLock.withLock { queueProcessingLock.withLock {
guard Self.queueProcessingTask == nil else { return } guard queueProcessingTask == nil else { return }
Self.queueProcessingTask = Task { queueProcessingTask = Task {
await startUploads() await startUploads()
Self.queueProcessingLock.withLock { Self.queueProcessingTask = nil } queueProcessingLock.withLock { queueProcessingTask = nil }
} }
} }
} }
private func startUploads() async { private func startUploads() async {
dPrint("Processing download queue") dPrint("Processing upload queue")
guard NetworkMonitor.shared.isConnected, guard NetworkMonitor.shared.isConnected,
let backupEnabled = try? await db.read({ conn in try Store.get(conn, StoreKey.enableBackup) }), let backupEnabled = try? storeRepository.get(StoreKey.enableBackup), backupEnabled,
backupEnabled let url = try? storeRepository.get(StoreKey.serverEndpoint),
else { return dPrint("Download queue paused: network disconnected or backup disabled") } let accessToken = try? storeRepository.get(StoreKey.accessToken)
else { return dPrint("Upload queue paused: missing preconditions") }
do { do {
let tasks: [LocalAssetUploadData] = try await db.read({ conn in let tasks = try await taskRepository.getUploadTasks()
guard let backupEnabled = try Store.get(conn, StoreKey.enableBackup), backupEnabled else { return [] }
return try UploadTask.join(LocalAsset.all) { task, asset in task.localId.eq(asset.id) }
.where { task, asset in
asset.checksum.isNot(nil) && task.status.eq(TaskStatus.uploadPending)
&& task.attempts < TaskConfig.maxRetries
&& task.filePath.isNot(nil)
}
.select { task, asset in
LocalAssetUploadData.Columns(
filePath: task.filePath.unwrapped,
priority: task.priority,
taskId: task.id,
type: asset.type
)
}
.limit { task, _ in UploadTaskStat.availableUploadSlots }
.order { task, asset in (task.priority.desc(), task.createdAt) }
.fetchAll(conn)
})
if tasks.isEmpty { return dPrint("No upload tasks to process") } if tasks.isEmpty { return dPrint("No upload tasks to process") }
await withTaskGroup(of: Void.self) { group in await withTaskGroup(of: Void.self) { group in
for task in tasks { for task in tasks {
group.addTask { await self.startUpload(multipart: task) } group.addTask { await self.startUpload(multipart: task, url: url, accessToken: accessToken) }
} }
} }
} catch { } catch {
@ -148,33 +141,18 @@ class UploadQueue {
} }
} }
private func startUpload(multipart task: LocalAssetUploadData) async { private func startUpload(multipart task: LocalAssetUploadData, url: URL, accessToken: String) async {
dPrint("Uploading asset resource at \(task.filePath) of task \(task.taskId)") dPrint("Uploading asset resource at \(task.filePath) of task \(task.taskId)")
defer { startQueueProcessing() } defer { startQueueProcessing() }
let (url, accessToken, session): (URL, String, URLSession) let session =
do { switch task.type {
(url, accessToken, session) = try await db.read { conn in case .image:
guard let url = try Store.get(conn, StoreKey.serverEndpoint), (try? storeRepository.get(StoreKey.useWifiForUploadPhotos)) ?? false ? wifiOnlySession : cellularSession
let accessToken = try Store.get(conn, StoreKey.accessToken) case .video:
else { (try? storeRepository.get(StoreKey.useWifiForUploadVideos)) ?? false ? wifiOnlySession : cellularSession
throw StoreError.notFound default: wifiOnlySession
}
let session =
switch task.type {
case .image:
(try? Store.get(conn, StoreKey.useWifiForUploadPhotos)) ?? false ? cellularSession : wifiOnlySession
case .video:
(try? Store.get(conn, StoreKey.useWifiForUploadVideos)) ?? false ? cellularSession : wifiOnlySession
default: wifiOnlySession
}
return (url, accessToken, session)
} }
} catch {
dPrint("Upload failed for \(task.taskId), could not retrieve server URL or access token: \(error)")
return handleFailure(task: task, code: .noServerUrl)
}
var request = URLRequest(url: url.appendingPathComponent("/assets")) var request = URLRequest(url: url.appendingPathComponent("/assets"))
request.httpMethod = "POST" request.httpMethod = "POST"
@ -186,18 +164,11 @@ class UploadQueue {
sessionTask.priority = task.priority sessionTask.priority = task.priority
do { do {
try? FileManager.default.removeItem(at: task.filePath) // upload task already copied the file try? FileManager.default.removeItem(at: task.filePath) // upload task already copied the file
try await db.write { conn in try await taskRepository.markUploadQueued(taskId: task.taskId)
try UploadTask.update { row in
row.status = .uploadQueued
row.filePath = nil
}
.where { $0.id.eq(task.taskId) }
.execute(conn)
}
statusListener.onTaskStatus( statusListener.onTaskStatus(
UploadApiTaskStatus( UploadApiTaskStatus(
id: String(task.taskId), id: String(task.taskId),
filename: task.filePath.lastPathComponent, filename: task.filename,
status: .uploadQueued, status: .uploadQueued,
) )
) )
@ -209,11 +180,16 @@ class UploadQueue {
} }
} }
private func handleFailure(task: LocalAssetUploadData, code: UploadErrorCode) { private func handleFailure(task: LocalAssetUploadData, code: UploadErrorCode) async {
do { do {
try db.write { conn in try await taskRepository.retryOrFail(taskId: task.taskId, code: code, status: .uploadFailed)
try UploadTask.retryOrFail(code: code, status: .uploadFailed).where { $0.id.eq(task.taskId) }.execute(conn) statusListener.onTaskStatus(
} UploadApiTaskStatus(
id: String(task.taskId),
filename: task.filename,
status: .uploadFailed
)
)
} catch { } catch {
dPrint("Failed to update upload failure status for task \(task.taskId): \(error)") dPrint("Failed to update upload failure status for task \(task.taskId): \(error)")
} }

View File

@ -122,18 +122,19 @@ enum UploadApiErrorCode: Int {
case networkError = 8 case networkError = 8
case photosInternalError = 9 case photosInternalError = 9
case photosUnknownError = 10 case photosUnknownError = 10
case noServerUrl = 11 case interrupted = 11
case noDeviceId = 12 case cancelled = 12
case noAccessToken = 13 case downloadStalled = 13
case interrupted = 14 case forceQuit = 14
case cancelled = 15 case outOfResources = 15
case downloadStalled = 16 case backgroundUpdatesDisabled = 16
case forceQuit = 17 case uploadTimeout = 17
case outOfResources = 18 case iCloudRateLimit = 18
case backgroundUpdatesDisabled = 19 case iCloudThrottled = 19
case uploadTimeout = 20 case invalidResponse = 20
case iCloudRateLimit = 21 case badRequest = 21
case iCloudThrottled = 22 case internalServerError = 22
case unauthorized = 23
} }
enum UploadApiStatus: Int { enum UploadApiStatus: Int {
@ -294,6 +295,7 @@ protocol UploadApi {
func cancelAll(completion: @escaping (Result<Void, Error>) -> Void) func cancelAll(completion: @escaping (Result<Void, Error>) -> Void)
func enqueueAssets(localIds: [String], completion: @escaping (Result<Void, Error>) -> Void) func enqueueAssets(localIds: [String], completion: @escaping (Result<Void, Error>) -> Void)
func enqueueFiles(paths: [String], completion: @escaping (Result<Void, Error>) -> Void) func enqueueFiles(paths: [String], completion: @escaping (Result<Void, Error>) -> Void)
func onConfigChange(key: Int64, completion: @escaping (Result<Void, Error>) -> Void)
} }
/// Generated setup class from Pigeon to handle messages through the `binaryMessenger`. /// Generated setup class from Pigeon to handle messages through the `binaryMessenger`.
@ -381,6 +383,23 @@ class UploadApiSetup {
} else { } else {
enqueueFilesChannel.setMessageHandler(nil) enqueueFilesChannel.setMessageHandler(nil)
} }
let onConfigChangeChannel = FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.UploadApi.onConfigChange\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec)
if let api = api {
onConfigChangeChannel.setMessageHandler { message, reply in
let args = message as! [Any?]
let keyArg = args[0] as! Int64
api.onConfigChange(key: keyArg) { result in
switch result {
case .success:
reply(wrapResult(nil))
case .failure(let error):
reply(wrapError(error))
}
}
}
} else {
onConfigChangeChannel.setMessageHandler(nil)
}
} }
} }

View File

@ -1,5 +1,4 @@
import SQLiteData import SQLiteData
import StructuredFieldValues
extension FileHandle { extension FileHandle {
static func createOrOverwrite(atPath path: String) throws -> FileHandle { static func createOrOverwrite(atPath path: String) throws -> FileHandle {
@ -11,33 +10,41 @@ extension FileHandle {
} }
} }
class UploadApiImpl: ImmichPlugin, UploadApi { final class UploadApiImpl<
private let db: DatabasePool StoreRepo: StoreProtocol,
private let downloadQueue: DownloadQueue TaskRepo: TaskProtocol,
private let uploadQueue: UploadQueue StatusListener: TaskStatusListener,
ProgressListener: TaskProgressListener
>: ImmichPlugin, UploadApi {
private let storeRepository: StoreRepo
private let taskRepository: TaskRepo
private let downloadQueue: DownloadQueue<StoreRepo, TaskRepo, StatusListener, ProgressListener>
private let uploadQueue: UploadQueue<StoreRepo, TaskRepo, StatusListener>
private var isInitialized = false private var isInitialized = false
private let initLock = NSLock() private let initLock = NSLock()
private var backupTask: Task<Void, Never>? private var backupTask: Task<Void, Never>?
private let backupLock = NSLock() private let backupLock = NSLock()
private let cellularSession: URLSession private let cellularSession: URLSession
private let wifiOnlySession: URLSession private let wifiOnlySession: URLSession
init(statusListener: StatusEventListener, progressListener: ProgressEventListener) { init(
let dbUrl = try! FileManager.default.url( storeRepository: StoreRepo,
for: .documentDirectory, taskRepository: TaskRepo,
in: .userDomainMask, statusListener: StatusListener,
appropriateFor: nil, progressListener: ProgressListener
create: true ) {
).appendingPathComponent("immich.sqlite") self.taskRepository = taskRepository
let delegate = UploadApiDelegate(
self.db = try! DatabasePool(path: dbUrl.path) taskRepository: taskRepository,
statusListener: statusListener,
progressListener: progressListener
)
let cellularConfig = URLSessionConfiguration.background(withIdentifier: "\(TaskConfig.sessionId).cellular") let cellularConfig = URLSessionConfiguration.background(withIdentifier: "\(TaskConfig.sessionId).cellular")
cellularConfig.allowsCellularAccess = true cellularConfig.allowsCellularAccess = true
cellularConfig.waitsForConnectivity = true cellularConfig.waitsForConnectivity = true
let delegate = UploadApiDelegate(db: db, statusListener: statusListener, progressListener: progressListener)
self.cellularSession = URLSession(configuration: cellularConfig, delegate: delegate, delegateQueue: nil) self.cellularSession = URLSession(configuration: cellularConfig, delegate: delegate, delegateQueue: nil)
let wifiOnlyConfig = URLSessionConfiguration.background(withIdentifier: "\(TaskConfig.sessionId).wifi") let wifiOnlyConfig = URLSessionConfiguration.background(withIdentifier: "\(TaskConfig.sessionId).wifi")
@ -45,28 +52,26 @@ class UploadApiImpl: ImmichPlugin, UploadApi {
wifiOnlyConfig.waitsForConnectivity = true wifiOnlyConfig.waitsForConnectivity = true
self.wifiOnlySession = URLSession(configuration: wifiOnlyConfig, delegate: delegate, delegateQueue: nil) self.wifiOnlySession = URLSession(configuration: wifiOnlyConfig, delegate: delegate, delegateQueue: nil)
self.storeRepository = storeRepository
self.uploadQueue = UploadQueue( self.uploadQueue = UploadQueue(
db: db, storeRepository: storeRepository,
taskRepository: taskRepository,
statusListener: statusListener,
cellularSession: cellularSession, cellularSession: cellularSession,
wifiOnlySession: wifiOnlySession, wifiOnlySession: wifiOnlySession
statusListener: statusListener
) )
self.downloadQueue = DownloadQueue( self.downloadQueue = DownloadQueue(
db: db, storeRepository: storeRepository,
uploadQueue: uploadQueue, taskRepository: taskRepository,
statusListener: statusListener, statusListener: statusListener,
progressListener: progressListener progressListener: progressListener
) )
delegate.downloadQueue = downloadQueue
delegate.uploadQueue = uploadQueue
} }
func initialize(completion: @escaping (Result<Void, any Error>) -> Void) { func initialize(completion: @escaping (Result<Void, any Error>) -> Void) {
Task(priority: .high) { Task(priority: .high) {
do { do {
async let dbIds = db.read { conn in async let dbIds = taskRepository.getTaskIds(status: .uploadQueued)
try UploadTask.select(\.id).where { $0.status.eq(TaskStatus.uploadQueued) }.fetchAll(conn)
}
async let cellularTasks = cellularSession.allTasks async let cellularTasks = cellularSession.allTasks
async let wifiTasks = wifiOnlySession.allTasks async let wifiTasks = wifiOnlySession.allTasks
@ -84,15 +89,7 @@ class UploadApiImpl: ImmichPlugin, UploadApi {
validateTasks(await cellularTasks) validateTasks(await cellularTasks)
validateTasks(await wifiTasks) validateTasks(await wifiTasks)
let orphanIds = Array(dbTaskIds) try await taskRepository.markOrphansPending(ids: Array(dbTaskIds))
try await db.write { conn in
try UploadTask.update {
$0.filePath = nil
$0.status = .downloadPending
}
.where { row in row.status.in([TaskStatus.downloadQueued, TaskStatus.uploadPending]) || row.id.in(orphanIds) }
.execute(conn)
}
try? FileManager.default.removeItem(at: TaskConfig.originalsDir) try? FileManager.default.removeItem(at: TaskConfig.originalsDir)
initLock.withLock { isInitialized = true } initLock.withLock { isInitialized = true }
@ -139,9 +136,9 @@ class UploadApiImpl: ImmichPlugin, UploadApi {
Task { Task {
do { do {
try await downloadQueue.enqueueAssets(localIds: localIds) try await downloadQueue.enqueueAssets(localIds: localIds)
completion(.success(())) self.completeWhenActive(for: completion, with: .success(()))
} catch { } catch {
completion(.failure(error)) self.completeWhenActive(for: completion, with: .failure(error))
} }
} }
} }
@ -150,13 +147,24 @@ class UploadApiImpl: ImmichPlugin, UploadApi {
Task { Task {
do { do {
try await uploadQueue.enqueueFiles(paths: paths) try await uploadQueue.enqueueFiles(paths: paths)
completion(.success(())) self.completeWhenActive(for: completion, with: .success(()))
} catch { } catch {
completion(.failure(error)) self.completeWhenActive(for: completion, with: .failure(error))
} }
} }
} }
func onConfigChange(key: Int64, completion: @escaping (Result<Void, any Error>) -> Void) {
storeRepository.invalidateCache()
Task {
if let key = StoreKey(rawValue: Int(key)), key == ._accessToken {
try? await taskRepository.resolveError(code: .unauthorized)
}
startBackup()
self.completeWhenActive(for: completion, with: .success(()))
}
}
private func cancelSessionTasks(_ tasks: [URLSessionTask]) { private func cancelSessionTasks(_ tasks: [URLSessionTask]) {
dPrint("Canceling \(tasks.count) tasks") dPrint("Canceling \(tasks.count) tasks")
for task in tasks { for task in tasks {
@ -165,107 +173,20 @@ class UploadApiImpl: ImmichPlugin, UploadApi {
} }
private func _startBackup() async { private func _startBackup() async {
defer { downloadQueue.startQueueProcessing() } defer {
downloadQueue.startQueueProcessing()
uploadQueue.startQueueProcessing()
}
do { do {
let candidates = try await db.read { conn in let candidates = try await taskRepository.getBackupCandidates()
return try LocalAsset.getCandidates()
.where { asset in !UploadTask.where { task in task.localId.eq(asset.id) }.exists() }
.select { LocalAssetCandidate.Columns(id: $0.id, type: $0.type) }
.limit { _ in UploadTaskStat.availableSlots }
.fetchAll(conn)
}
guard !candidates.isEmpty else { return dPrint("No candidates for backup") } guard !candidates.isEmpty else { return dPrint("No candidates for backup") }
try await db.write { conn in try await taskRepository.enqueue(assets: candidates, imagePriority: 0.5, videoPriority: 0.3)
var draft = UploadTask.Draft(
attempts: 0,
createdAt: Date(),
filePath: nil,
isLivePhoto: nil,
lastError: nil,
livePhotoVideoId: nil,
localId: "",
method: .multipart,
priority: 0.5,
retryAfter: nil,
status: .downloadPending,
)
for candidate in candidates {
draft.localId = candidate.id
draft.priority = candidate.type == .image ? 0.5 : 0.3
try UploadTask.insert {
draft
} onConflict: {
($0.localId, $0.livePhotoVideoId)
}
.execute(conn)
}
}
dPrint("Backup enqueued \(candidates.count) assets for upload") dPrint("Backup enqueued \(candidates.count) assets for upload")
} catch { } catch {
print("Backup queue error: \(error)") print("Backup queue error: \(error)")
} }
} }
} }
struct AssetData: StructuredFieldValue {
static let structuredFieldType: StructuredFieldType = .dictionary
let deviceAssetId: String
let deviceId: String
let fileCreatedAt: String
let fileModifiedAt: String
let fileName: String
let isFavorite: Bool
let livePhotoVideoId: String?
static let boundary = "Boundary-\(UUID().uuidString)"
static let deviceAssetIdField = "--\(boundary)\r\nContent-Disposition: form-data; name=\"deviceAssetId\"\r\n\r\n"
.data(using: .utf8)!
static let deviceIdField = "\r\n--\(boundary)\r\nContent-Disposition: form-data; name=\"deviceId\"\r\n\r\n"
.data(using: .utf8)!
static let fileCreatedAtField =
"\r\n--\(boundary)\r\nContent-Disposition: form-data; name=\"fileCreatedAt\"\r\n\r\n"
.data(using: .utf8)!
static let fileModifiedAtField =
"\r\n--\(boundary)\r\nContent-Disposition: form-data; name=\"fileModifiedAt\"\r\n\r\n"
.data(using: .utf8)!
static let isFavoriteField = "\r\n--\(boundary)\r\nContent-Disposition: form-data; name=\"isFavorite\"\r\n\r\n"
.data(using: .utf8)!
static let livePhotoVideoIdField =
"\r\n--\(boundary)\r\nContent-Disposition: form-data; name=\"livePhotoVideoId\"\r\n\r\n"
.data(using: .utf8)!
static let trueData = "true".data(using: .utf8)!
static let falseData = "false".data(using: .utf8)!
static let footer = "\r\n--\(boundary)--\r\n".data(using: .utf8)!
static let contentType = "multipart/form-data; boundary=\(boundary)"
func multipart() -> (Data, Data) {
var header = Data()
header.append(Self.deviceAssetIdField)
header.append(deviceAssetId.data(using: .utf8)!)
header.append(Self.deviceIdField)
header.append(deviceId.data(using: .utf8)!)
header.append(Self.fileCreatedAtField)
header.append(fileCreatedAt.data(using: .utf8)!)
header.append(Self.fileModifiedAtField)
header.append(fileModifiedAt.data(using: .utf8)!)
header.append(Self.isFavoriteField)
header.append(isFavorite ? Self.trueData : Self.falseData)
if let livePhotoVideoId {
header.append(Self.livePhotoVideoIdField)
header.append(livePhotoVideoId.data(using: .utf8)!)
}
header.append(
"\r\n--\(Self.boundary)\r\nContent-Disposition: form-data; name=\"assetData\"; filename=\"\(fileName)\"\r\nContent-Type: application/octet-stream\r\n\r\n"
.data(using: .utf8)!
)
return (header, Self.footer)
}
}

View File

@ -5,6 +5,7 @@ import 'package:immich_mobile/infrastructure/entities/store.entity.dart';
import 'package:immich_mobile/infrastructure/entities/store.entity.drift.dart'; import 'package:immich_mobile/infrastructure/entities/store.entity.drift.dart';
import 'package:immich_mobile/infrastructure/repositories/db.repository.dart'; import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
import 'package:immich_mobile/infrastructure/repositories/user.repository.dart'; import 'package:immich_mobile/infrastructure/repositories/user.repository.dart';
import 'package:immich_mobile/providers/infrastructure/platform.provider.dart';
import 'package:isar/isar.dart'; import 'package:isar/isar.dart';
// Temporary interface until Isar is removed to make the service work with both Isar and Sqlite // Temporary interface until Isar is removed to make the service work with both Isar and Sqlite
@ -141,6 +142,7 @@ class DriftStoreRepository extends DriftDatabaseRepository implements IStoreRepo
@override @override
Future<bool> upsert<T>(StoreKey<T> key, T value) async { Future<bool> upsert<T>(StoreKey<T> key, T value) async {
await _db.storeEntity.insertOnConflictUpdate(await _fromValue(key, value)); await _db.storeEntity.insertOnConflictUpdate(await _fromValue(key, value));
await uploadApi.onConfigChange(key.id);
return true; return true;
} }

View File

@ -41,9 +41,6 @@ enum UploadApiErrorCode {
networkError, networkError,
photosInternalError, photosInternalError,
photosUnknownError, photosUnknownError,
noServerUrl,
noDeviceId,
noAccessToken,
interrupted, interrupted,
cancelled, cancelled,
downloadStalled, downloadStalled,
@ -53,6 +50,10 @@ enum UploadApiErrorCode {
uploadTimeout, uploadTimeout,
iCloudRateLimit, iCloudRateLimit,
iCloudThrottled, iCloudThrottled,
invalidResponse,
badRequest,
internalServerError,
unauthorized,
} }
enum UploadApiStatus { enum UploadApiStatus {
@ -339,6 +340,29 @@ class UploadApi {
return; return;
} }
} }
Future<void> onConfigChange(int key) async {
final String pigeonVar_channelName =
'dev.flutter.pigeon.immich_mobile.UploadApi.onConfigChange$pigeonVar_messageChannelSuffix';
final BasicMessageChannel<Object?> pigeonVar_channel = BasicMessageChannel<Object?>(
pigeonVar_channelName,
pigeonChannelCodec,
binaryMessenger: pigeonVar_binaryMessenger,
);
final Future<Object?> pigeonVar_sendFuture = pigeonVar_channel.send(<Object?>[key]);
final List<Object?>? pigeonVar_replyList = await pigeonVar_sendFuture as List<Object?>?;
if (pigeonVar_replyList == null) {
throw _createConnectionError(pigeonVar_channelName);
} else if (pigeonVar_replyList.length > 1) {
throw PlatformException(
code: pigeonVar_replyList[0]! as String,
message: pigeonVar_replyList[1] as String?,
details: pigeonVar_replyList[2],
);
} else {
return;
}
}
} }
Stream<UploadApiTaskStatus> streamStatus({String instanceName = ''}) { Stream<UploadApiTaskStatus> streamStatus({String instanceName = ''}) {

View File

@ -12,9 +12,6 @@ enum UploadApiErrorCode {
networkError("Network error"), networkError("Network error"),
photosInternalError("Apple Photos internal error"), photosInternalError("Apple Photos internal error"),
photosUnknownError("Apple Photos unknown error"), photosUnknownError("Apple Photos unknown error"),
noServerUrl("Server URL is not set"),
noDeviceId("Device ID is not set"),
noAccessToken("Access token is not set"),
interrupted("Upload interrupted"), interrupted("Upload interrupted"),
cancelled("Upload cancelled"), cancelled("Upload cancelled"),
downloadStalled("Download stalled"), downloadStalled("Download stalled"),
@ -23,7 +20,11 @@ enum UploadApiErrorCode {
backgroundUpdatesDisabled("Background updates are disabled"), backgroundUpdatesDisabled("Background updates are disabled"),
uploadTimeout("Upload timed out"), uploadTimeout("Upload timed out"),
iCloudRateLimit("iCloud rate limit reached"), iCloudRateLimit("iCloud rate limit reached"),
iCloudThrottled("iCloud requests are being throttled"); iCloudThrottled("iCloud requests are being throttled"),
invalidResponse("Invalid response from server"),
badRequest("Server rejected the upload request"),
internalServerError("Internal server error"),
unauthorized("Unauthorized access");
final String message; final String message;
@ -87,6 +88,9 @@ abstract class UploadApi {
@async @async
void enqueueFiles(List<String> paths); void enqueueFiles(List<String> paths);
@async
void onConfigChange(int key);
} }
@EventChannelApi() @EventChannelApi()