io_uring/cancel: move cancelation code from io_uring.c to cancel.c
There's a bunch of code strictly dealing with cancelations, and that code really belongs in cancel.c rather than in the core io_uring.c file. Move the code there. Mostly mechanical, only real oddity here is that struct io_defer_entry now needs to be visible across both io_uring.c and cancel.c. Signed-off-by: Jens Axboe <axboe@kernel.dk>pull/1354/merge
parent
01e019b2a3
commit
ffce324364
|
|
@ -14,6 +14,8 @@
|
||||||
#include "filetable.h"
|
#include "filetable.h"
|
||||||
#include "io_uring.h"
|
#include "io_uring.h"
|
||||||
#include "tctx.h"
|
#include "tctx.h"
|
||||||
|
#include "sqpoll.h"
|
||||||
|
#include "uring_cmd.h"
|
||||||
#include "poll.h"
|
#include "poll.h"
|
||||||
#include "timeout.h"
|
#include "timeout.h"
|
||||||
#include "waitid.h"
|
#include "waitid.h"
|
||||||
|
|
@ -428,3 +430,227 @@ void __io_uring_cancel(bool cancel_all)
|
||||||
io_uring_unreg_ringfd();
|
io_uring_unreg_ringfd();
|
||||||
io_uring_cancel_generic(cancel_all, NULL);
|
io_uring_cancel_generic(cancel_all, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct io_task_cancel {
|
||||||
|
struct io_uring_task *tctx;
|
||||||
|
bool all;
|
||||||
|
};
|
||||||
|
|
||||||
|
static bool io_cancel_task_cb(struct io_wq_work *work, void *data)
|
||||||
|
{
|
||||||
|
struct io_kiocb *req = container_of(work, struct io_kiocb, work);
|
||||||
|
struct io_task_cancel *cancel = data;
|
||||||
|
|
||||||
|
return io_match_task_safe(req, cancel->tctx, cancel->all);
|
||||||
|
}
|
||||||
|
|
||||||
|
static __cold bool io_cancel_defer_files(struct io_ring_ctx *ctx,
|
||||||
|
struct io_uring_task *tctx,
|
||||||
|
bool cancel_all)
|
||||||
|
{
|
||||||
|
struct io_defer_entry *de;
|
||||||
|
LIST_HEAD(list);
|
||||||
|
|
||||||
|
list_for_each_entry_reverse(de, &ctx->defer_list, list) {
|
||||||
|
if (io_match_task_safe(de->req, tctx, cancel_all)) {
|
||||||
|
list_cut_position(&list, &ctx->defer_list, &de->list);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (list_empty(&list))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
while (!list_empty(&list)) {
|
||||||
|
de = list_first_entry(&list, struct io_defer_entry, list);
|
||||||
|
list_del_init(&de->list);
|
||||||
|
ctx->nr_drained -= io_linked_nr(de->req);
|
||||||
|
io_req_task_queue_fail(de->req, -ECANCELED);
|
||||||
|
kfree(de);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
__cold bool io_cancel_ctx_cb(struct io_wq_work *work, void *data)
|
||||||
|
{
|
||||||
|
struct io_kiocb *req = container_of(work, struct io_kiocb, work);
|
||||||
|
|
||||||
|
return req->ctx == data;
|
||||||
|
}
|
||||||
|
|
||||||
|
static __cold bool io_uring_try_cancel_iowq(struct io_ring_ctx *ctx)
|
||||||
|
{
|
||||||
|
struct io_tctx_node *node;
|
||||||
|
enum io_wq_cancel cret;
|
||||||
|
bool ret = false;
|
||||||
|
|
||||||
|
mutex_lock(&ctx->uring_lock);
|
||||||
|
list_for_each_entry(node, &ctx->tctx_list, ctx_node) {
|
||||||
|
struct io_uring_task *tctx = node->task->io_uring;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* io_wq will stay alive while we hold uring_lock, because it's
|
||||||
|
* killed after ctx nodes, which requires to take the lock.
|
||||||
|
*/
|
||||||
|
if (!tctx || !tctx->io_wq)
|
||||||
|
continue;
|
||||||
|
cret = io_wq_cancel_cb(tctx->io_wq, io_cancel_ctx_cb, ctx, true);
|
||||||
|
ret |= (cret != IO_WQ_CANCEL_NOTFOUND);
|
||||||
|
}
|
||||||
|
mutex_unlock(&ctx->uring_lock);
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
__cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
|
||||||
|
struct io_uring_task *tctx,
|
||||||
|
bool cancel_all, bool is_sqpoll_thread)
|
||||||
|
{
|
||||||
|
struct io_task_cancel cancel = { .tctx = tctx, .all = cancel_all, };
|
||||||
|
enum io_wq_cancel cret;
|
||||||
|
bool ret = false;
|
||||||
|
|
||||||
|
/* set it so io_req_local_work_add() would wake us up */
|
||||||
|
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
|
||||||
|
atomic_set(&ctx->cq_wait_nr, 1);
|
||||||
|
smp_mb();
|
||||||
|
}
|
||||||
|
|
||||||
|
/* failed during ring init, it couldn't have issued any requests */
|
||||||
|
if (!ctx->rings)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (!tctx) {
|
||||||
|
ret |= io_uring_try_cancel_iowq(ctx);
|
||||||
|
} else if (tctx->io_wq) {
|
||||||
|
/*
|
||||||
|
* Cancels requests of all rings, not only @ctx, but
|
||||||
|
* it's fine as the task is in exit/exec.
|
||||||
|
*/
|
||||||
|
cret = io_wq_cancel_cb(tctx->io_wq, io_cancel_task_cb,
|
||||||
|
&cancel, true);
|
||||||
|
ret |= (cret != IO_WQ_CANCEL_NOTFOUND);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* SQPOLL thread does its own polling */
|
||||||
|
if ((!(ctx->flags & IORING_SETUP_SQPOLL) && cancel_all) ||
|
||||||
|
is_sqpoll_thread) {
|
||||||
|
while (!wq_list_empty(&ctx->iopoll_list)) {
|
||||||
|
io_iopoll_try_reap_events(ctx);
|
||||||
|
ret = true;
|
||||||
|
cond_resched();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) &&
|
||||||
|
io_allowed_defer_tw_run(ctx))
|
||||||
|
ret |= io_run_local_work(ctx, INT_MAX, INT_MAX) > 0;
|
||||||
|
mutex_lock(&ctx->uring_lock);
|
||||||
|
ret |= io_cancel_defer_files(ctx, tctx, cancel_all);
|
||||||
|
ret |= io_poll_remove_all(ctx, tctx, cancel_all);
|
||||||
|
ret |= io_waitid_remove_all(ctx, tctx, cancel_all);
|
||||||
|
ret |= io_futex_remove_all(ctx, tctx, cancel_all);
|
||||||
|
ret |= io_uring_try_cancel_uring_cmd(ctx, tctx, cancel_all);
|
||||||
|
mutex_unlock(&ctx->uring_lock);
|
||||||
|
ret |= io_kill_timeouts(ctx, tctx, cancel_all);
|
||||||
|
if (tctx)
|
||||||
|
ret |= io_run_task_work() > 0;
|
||||||
|
else
|
||||||
|
ret |= flush_delayed_work(&ctx->fallback_work);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
static s64 tctx_inflight(struct io_uring_task *tctx, bool tracked)
|
||||||
|
{
|
||||||
|
if (tracked)
|
||||||
|
return atomic_read(&tctx->inflight_tracked);
|
||||||
|
return percpu_counter_sum(&tctx->inflight);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Find any io_uring ctx that this task has registered or done IO on, and cancel
|
||||||
|
* requests. @sqd should be not-null IFF it's an SQPOLL thread cancellation.
|
||||||
|
*/
|
||||||
|
__cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd)
|
||||||
|
{
|
||||||
|
struct io_uring_task *tctx = current->io_uring;
|
||||||
|
struct io_ring_ctx *ctx;
|
||||||
|
struct io_tctx_node *node;
|
||||||
|
unsigned long index;
|
||||||
|
s64 inflight;
|
||||||
|
DEFINE_WAIT(wait);
|
||||||
|
|
||||||
|
WARN_ON_ONCE(sqd && sqpoll_task_locked(sqd) != current);
|
||||||
|
|
||||||
|
if (!current->io_uring)
|
||||||
|
return;
|
||||||
|
if (tctx->io_wq)
|
||||||
|
io_wq_exit_start(tctx->io_wq);
|
||||||
|
|
||||||
|
atomic_inc(&tctx->in_cancel);
|
||||||
|
do {
|
||||||
|
bool loop = false;
|
||||||
|
|
||||||
|
io_uring_drop_tctx_refs(current);
|
||||||
|
if (!tctx_inflight(tctx, !cancel_all))
|
||||||
|
break;
|
||||||
|
|
||||||
|
/* read completions before cancelations */
|
||||||
|
inflight = tctx_inflight(tctx, false);
|
||||||
|
if (!inflight)
|
||||||
|
break;
|
||||||
|
|
||||||
|
if (!sqd) {
|
||||||
|
xa_for_each(&tctx->xa, index, node) {
|
||||||
|
/* sqpoll task will cancel all its requests */
|
||||||
|
if (node->ctx->sq_data)
|
||||||
|
continue;
|
||||||
|
loop |= io_uring_try_cancel_requests(node->ctx,
|
||||||
|
current->io_uring,
|
||||||
|
cancel_all,
|
||||||
|
false);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
|
||||||
|
loop |= io_uring_try_cancel_requests(ctx,
|
||||||
|
current->io_uring,
|
||||||
|
cancel_all,
|
||||||
|
true);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (loop) {
|
||||||
|
cond_resched();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
prepare_to_wait(&tctx->wait, &wait, TASK_INTERRUPTIBLE);
|
||||||
|
io_run_task_work();
|
||||||
|
io_uring_drop_tctx_refs(current);
|
||||||
|
xa_for_each(&tctx->xa, index, node) {
|
||||||
|
if (io_local_work_pending(node->ctx)) {
|
||||||
|
WARN_ON_ONCE(node->ctx->submitter_task &&
|
||||||
|
node->ctx->submitter_task != current);
|
||||||
|
goto end_wait;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
* If we've seen completions, retry without waiting. This
|
||||||
|
* avoids a race where a completion comes in before we did
|
||||||
|
* prepare_to_wait().
|
||||||
|
*/
|
||||||
|
if (inflight == tctx_inflight(tctx, !cancel_all))
|
||||||
|
schedule();
|
||||||
|
end_wait:
|
||||||
|
finish_wait(&tctx->wait, &wait);
|
||||||
|
} while (1);
|
||||||
|
|
||||||
|
io_uring_clean_tctx(tctx);
|
||||||
|
if (cancel_all) {
|
||||||
|
/*
|
||||||
|
* We shouldn't run task_works after cancel, so just leave
|
||||||
|
* ->in_cancel set for normal exit.
|
||||||
|
*/
|
||||||
|
atomic_dec(&tctx->in_cancel);
|
||||||
|
/* for exec all current's requests should be gone, kill tctx */
|
||||||
|
__io_uring_free(current);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -29,10 +29,14 @@ bool io_match_task_safe(struct io_kiocb *head, struct io_uring_task *tctx,
|
||||||
bool io_cancel_remove_all(struct io_ring_ctx *ctx, struct io_uring_task *tctx,
|
bool io_cancel_remove_all(struct io_ring_ctx *ctx, struct io_uring_task *tctx,
|
||||||
struct hlist_head *list, bool cancel_all,
|
struct hlist_head *list, bool cancel_all,
|
||||||
bool (*cancel)(struct io_kiocb *));
|
bool (*cancel)(struct io_kiocb *));
|
||||||
|
|
||||||
int io_cancel_remove(struct io_ring_ctx *ctx, struct io_cancel_data *cd,
|
int io_cancel_remove(struct io_ring_ctx *ctx, struct io_cancel_data *cd,
|
||||||
unsigned int issue_flags, struct hlist_head *list,
|
unsigned int issue_flags, struct hlist_head *list,
|
||||||
bool (*cancel)(struct io_kiocb *));
|
bool (*cancel)(struct io_kiocb *));
|
||||||
|
__cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
|
||||||
|
struct io_uring_task *tctx,
|
||||||
|
bool cancel_all, bool is_sqpoll_thread);
|
||||||
|
__cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd);
|
||||||
|
__cold bool io_cancel_ctx_cb(struct io_wq_work *work, void *data);
|
||||||
|
|
||||||
static inline bool io_cancel_match_sequence(struct io_kiocb *req, int sequence)
|
static inline bool io_cancel_match_sequence(struct io_kiocb *req, int sequence)
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -124,11 +124,6 @@
|
||||||
#define IO_REQ_ALLOC_BATCH 8
|
#define IO_REQ_ALLOC_BATCH 8
|
||||||
#define IO_LOCAL_TW_DEFAULT_MAX 20
|
#define IO_LOCAL_TW_DEFAULT_MAX 20
|
||||||
|
|
||||||
struct io_defer_entry {
|
|
||||||
struct list_head list;
|
|
||||||
struct io_kiocb *req;
|
|
||||||
};
|
|
||||||
|
|
||||||
/* requests with any of those set should undergo io_disarm_next() */
|
/* requests with any of those set should undergo io_disarm_next() */
|
||||||
#define IO_DISARM_MASK (REQ_F_ARM_LTIMEOUT | REQ_F_LINK_TIMEOUT | REQ_F_FAIL)
|
#define IO_DISARM_MASK (REQ_F_ARM_LTIMEOUT | REQ_F_LINK_TIMEOUT | REQ_F_FAIL)
|
||||||
|
|
||||||
|
|
@ -140,11 +135,6 @@ struct io_defer_entry {
|
||||||
/* Forced wake up if there is a waiter regardless of ->cq_wait_nr */
|
/* Forced wake up if there is a waiter regardless of ->cq_wait_nr */
|
||||||
#define IO_CQ_WAKE_FORCE (IO_CQ_WAKE_INIT >> 1)
|
#define IO_CQ_WAKE_FORCE (IO_CQ_WAKE_INIT >> 1)
|
||||||
|
|
||||||
static bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
|
|
||||||
struct io_uring_task *tctx,
|
|
||||||
bool cancel_all,
|
|
||||||
bool is_sqpoll_thread);
|
|
||||||
|
|
||||||
static void io_queue_sqe(struct io_kiocb *req, unsigned int extra_flags);
|
static void io_queue_sqe(struct io_kiocb *req, unsigned int extra_flags);
|
||||||
static void __io_req_caches_free(struct io_ring_ctx *ctx);
|
static void __io_req_caches_free(struct io_ring_ctx *ctx);
|
||||||
|
|
||||||
|
|
@ -512,7 +502,7 @@ void io_req_queue_iowq(struct io_kiocb *req)
|
||||||
io_req_task_work_add(req);
|
io_req_task_work_add(req);
|
||||||
}
|
}
|
||||||
|
|
||||||
static unsigned io_linked_nr(struct io_kiocb *req)
|
unsigned io_linked_nr(struct io_kiocb *req)
|
||||||
{
|
{
|
||||||
struct io_kiocb *tmp;
|
struct io_kiocb *tmp;
|
||||||
unsigned nr = 0;
|
unsigned nr = 0;
|
||||||
|
|
@ -681,7 +671,7 @@ void io_task_refs_refill(struct io_uring_task *tctx)
|
||||||
tctx->cached_refs += refill;
|
tctx->cached_refs += refill;
|
||||||
}
|
}
|
||||||
|
|
||||||
static __cold void io_uring_drop_tctx_refs(struct task_struct *task)
|
__cold void io_uring_drop_tctx_refs(struct task_struct *task)
|
||||||
{
|
{
|
||||||
struct io_uring_task *tctx = task->io_uring;
|
struct io_uring_task *tctx = task->io_uring;
|
||||||
unsigned int refs = tctx->cached_refs;
|
unsigned int refs = tctx->cached_refs;
|
||||||
|
|
@ -1409,8 +1399,7 @@ static inline int io_run_local_work_locked(struct io_ring_ctx *ctx,
|
||||||
max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
|
max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
|
||||||
}
|
}
|
||||||
|
|
||||||
static int io_run_local_work(struct io_ring_ctx *ctx, int min_events,
|
int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events)
|
||||||
int max_events)
|
|
||||||
{
|
{
|
||||||
struct io_tw_state ts = {};
|
struct io_tw_state ts = {};
|
||||||
int ret;
|
int ret;
|
||||||
|
|
@ -1564,7 +1553,7 @@ static unsigned io_cqring_events(struct io_ring_ctx *ctx)
|
||||||
* We can't just wait for polled events to come to us, we have to actively
|
* We can't just wait for polled events to come to us, we have to actively
|
||||||
* find and complete them.
|
* find and complete them.
|
||||||
*/
|
*/
|
||||||
static __cold void io_iopoll_try_reap_events(struct io_ring_ctx *ctx)
|
__cold void io_iopoll_try_reap_events(struct io_ring_ctx *ctx)
|
||||||
{
|
{
|
||||||
if (!(ctx->flags & IORING_SETUP_IOPOLL))
|
if (!(ctx->flags & IORING_SETUP_IOPOLL))
|
||||||
return;
|
return;
|
||||||
|
|
@ -2978,13 +2967,6 @@ static __cold void io_tctx_exit_cb(struct callback_head *cb)
|
||||||
complete(&work->completion);
|
complete(&work->completion);
|
||||||
}
|
}
|
||||||
|
|
||||||
static __cold bool io_cancel_ctx_cb(struct io_wq_work *work, void *data)
|
|
||||||
{
|
|
||||||
struct io_kiocb *req = container_of(work, struct io_kiocb, work);
|
|
||||||
|
|
||||||
return req->ctx == data;
|
|
||||||
}
|
|
||||||
|
|
||||||
static __cold void io_ring_exit_work(struct work_struct *work)
|
static __cold void io_ring_exit_work(struct work_struct *work)
|
||||||
{
|
{
|
||||||
struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx, exit_work);
|
struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx, exit_work);
|
||||||
|
|
@ -3118,224 +3100,6 @@ static int io_uring_release(struct inode *inode, struct file *file)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct io_task_cancel {
|
|
||||||
struct io_uring_task *tctx;
|
|
||||||
bool all;
|
|
||||||
};
|
|
||||||
|
|
||||||
static bool io_cancel_task_cb(struct io_wq_work *work, void *data)
|
|
||||||
{
|
|
||||||
struct io_kiocb *req = container_of(work, struct io_kiocb, work);
|
|
||||||
struct io_task_cancel *cancel = data;
|
|
||||||
|
|
||||||
return io_match_task_safe(req, cancel->tctx, cancel->all);
|
|
||||||
}
|
|
||||||
|
|
||||||
static __cold bool io_cancel_defer_files(struct io_ring_ctx *ctx,
|
|
||||||
struct io_uring_task *tctx,
|
|
||||||
bool cancel_all)
|
|
||||||
{
|
|
||||||
struct io_defer_entry *de;
|
|
||||||
LIST_HEAD(list);
|
|
||||||
|
|
||||||
list_for_each_entry_reverse(de, &ctx->defer_list, list) {
|
|
||||||
if (io_match_task_safe(de->req, tctx, cancel_all)) {
|
|
||||||
list_cut_position(&list, &ctx->defer_list, &de->list);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (list_empty(&list))
|
|
||||||
return false;
|
|
||||||
|
|
||||||
while (!list_empty(&list)) {
|
|
||||||
de = list_first_entry(&list, struct io_defer_entry, list);
|
|
||||||
list_del_init(&de->list);
|
|
||||||
ctx->nr_drained -= io_linked_nr(de->req);
|
|
||||||
io_req_task_queue_fail(de->req, -ECANCELED);
|
|
||||||
kfree(de);
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
static __cold bool io_uring_try_cancel_iowq(struct io_ring_ctx *ctx)
|
|
||||||
{
|
|
||||||
struct io_tctx_node *node;
|
|
||||||
enum io_wq_cancel cret;
|
|
||||||
bool ret = false;
|
|
||||||
|
|
||||||
mutex_lock(&ctx->uring_lock);
|
|
||||||
list_for_each_entry(node, &ctx->tctx_list, ctx_node) {
|
|
||||||
struct io_uring_task *tctx = node->task->io_uring;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* io_wq will stay alive while we hold uring_lock, because it's
|
|
||||||
* killed after ctx nodes, which requires to take the lock.
|
|
||||||
*/
|
|
||||||
if (!tctx || !tctx->io_wq)
|
|
||||||
continue;
|
|
||||||
cret = io_wq_cancel_cb(tctx->io_wq, io_cancel_ctx_cb, ctx, true);
|
|
||||||
ret |= (cret != IO_WQ_CANCEL_NOTFOUND);
|
|
||||||
}
|
|
||||||
mutex_unlock(&ctx->uring_lock);
|
|
||||||
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
|
|
||||||
struct io_uring_task *tctx,
|
|
||||||
bool cancel_all,
|
|
||||||
bool is_sqpoll_thread)
|
|
||||||
{
|
|
||||||
struct io_task_cancel cancel = { .tctx = tctx, .all = cancel_all, };
|
|
||||||
enum io_wq_cancel cret;
|
|
||||||
bool ret = false;
|
|
||||||
|
|
||||||
/* set it so io_req_local_work_add() would wake us up */
|
|
||||||
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
|
|
||||||
atomic_set(&ctx->cq_wait_nr, 1);
|
|
||||||
smp_mb();
|
|
||||||
}
|
|
||||||
|
|
||||||
/* failed during ring init, it couldn't have issued any requests */
|
|
||||||
if (!ctx->rings)
|
|
||||||
return false;
|
|
||||||
|
|
||||||
if (!tctx) {
|
|
||||||
ret |= io_uring_try_cancel_iowq(ctx);
|
|
||||||
} else if (tctx->io_wq) {
|
|
||||||
/*
|
|
||||||
* Cancels requests of all rings, not only @ctx, but
|
|
||||||
* it's fine as the task is in exit/exec.
|
|
||||||
*/
|
|
||||||
cret = io_wq_cancel_cb(tctx->io_wq, io_cancel_task_cb,
|
|
||||||
&cancel, true);
|
|
||||||
ret |= (cret != IO_WQ_CANCEL_NOTFOUND);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* SQPOLL thread does its own polling */
|
|
||||||
if ((!(ctx->flags & IORING_SETUP_SQPOLL) && cancel_all) ||
|
|
||||||
is_sqpoll_thread) {
|
|
||||||
while (!wq_list_empty(&ctx->iopoll_list)) {
|
|
||||||
io_iopoll_try_reap_events(ctx);
|
|
||||||
ret = true;
|
|
||||||
cond_resched();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) &&
|
|
||||||
io_allowed_defer_tw_run(ctx))
|
|
||||||
ret |= io_run_local_work(ctx, INT_MAX, INT_MAX) > 0;
|
|
||||||
mutex_lock(&ctx->uring_lock);
|
|
||||||
ret |= io_cancel_defer_files(ctx, tctx, cancel_all);
|
|
||||||
ret |= io_poll_remove_all(ctx, tctx, cancel_all);
|
|
||||||
ret |= io_waitid_remove_all(ctx, tctx, cancel_all);
|
|
||||||
ret |= io_futex_remove_all(ctx, tctx, cancel_all);
|
|
||||||
ret |= io_uring_try_cancel_uring_cmd(ctx, tctx, cancel_all);
|
|
||||||
mutex_unlock(&ctx->uring_lock);
|
|
||||||
ret |= io_kill_timeouts(ctx, tctx, cancel_all);
|
|
||||||
if (tctx)
|
|
||||||
ret |= io_run_task_work() > 0;
|
|
||||||
else
|
|
||||||
ret |= flush_delayed_work(&ctx->fallback_work);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
static s64 tctx_inflight(struct io_uring_task *tctx, bool tracked)
|
|
||||||
{
|
|
||||||
if (tracked)
|
|
||||||
return atomic_read(&tctx->inflight_tracked);
|
|
||||||
return percpu_counter_sum(&tctx->inflight);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Find any io_uring ctx that this task has registered or done IO on, and cancel
|
|
||||||
* requests. @sqd should be not-null IFF it's an SQPOLL thread cancellation.
|
|
||||||
*/
|
|
||||||
__cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd)
|
|
||||||
{
|
|
||||||
struct io_uring_task *tctx = current->io_uring;
|
|
||||||
struct io_ring_ctx *ctx;
|
|
||||||
struct io_tctx_node *node;
|
|
||||||
unsigned long index;
|
|
||||||
s64 inflight;
|
|
||||||
DEFINE_WAIT(wait);
|
|
||||||
|
|
||||||
WARN_ON_ONCE(sqd && sqpoll_task_locked(sqd) != current);
|
|
||||||
|
|
||||||
if (!current->io_uring)
|
|
||||||
return;
|
|
||||||
if (tctx->io_wq)
|
|
||||||
io_wq_exit_start(tctx->io_wq);
|
|
||||||
|
|
||||||
atomic_inc(&tctx->in_cancel);
|
|
||||||
do {
|
|
||||||
bool loop = false;
|
|
||||||
|
|
||||||
io_uring_drop_tctx_refs(current);
|
|
||||||
if (!tctx_inflight(tctx, !cancel_all))
|
|
||||||
break;
|
|
||||||
|
|
||||||
/* read completions before cancelations */
|
|
||||||
inflight = tctx_inflight(tctx, false);
|
|
||||||
if (!inflight)
|
|
||||||
break;
|
|
||||||
|
|
||||||
if (!sqd) {
|
|
||||||
xa_for_each(&tctx->xa, index, node) {
|
|
||||||
/* sqpoll task will cancel all its requests */
|
|
||||||
if (node->ctx->sq_data)
|
|
||||||
continue;
|
|
||||||
loop |= io_uring_try_cancel_requests(node->ctx,
|
|
||||||
current->io_uring,
|
|
||||||
cancel_all,
|
|
||||||
false);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
|
|
||||||
loop |= io_uring_try_cancel_requests(ctx,
|
|
||||||
current->io_uring,
|
|
||||||
cancel_all,
|
|
||||||
true);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (loop) {
|
|
||||||
cond_resched();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
prepare_to_wait(&tctx->wait, &wait, TASK_INTERRUPTIBLE);
|
|
||||||
io_run_task_work();
|
|
||||||
io_uring_drop_tctx_refs(current);
|
|
||||||
xa_for_each(&tctx->xa, index, node) {
|
|
||||||
if (io_local_work_pending(node->ctx)) {
|
|
||||||
WARN_ON_ONCE(node->ctx->submitter_task &&
|
|
||||||
node->ctx->submitter_task != current);
|
|
||||||
goto end_wait;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/*
|
|
||||||
* If we've seen completions, retry without waiting. This
|
|
||||||
* avoids a race where a completion comes in before we did
|
|
||||||
* prepare_to_wait().
|
|
||||||
*/
|
|
||||||
if (inflight == tctx_inflight(tctx, !cancel_all))
|
|
||||||
schedule();
|
|
||||||
end_wait:
|
|
||||||
finish_wait(&tctx->wait, &wait);
|
|
||||||
} while (1);
|
|
||||||
|
|
||||||
io_uring_clean_tctx(tctx);
|
|
||||||
if (cancel_all) {
|
|
||||||
/*
|
|
||||||
* We shouldn't run task_works after cancel, so just leave
|
|
||||||
* ->in_cancel set for normal exit.
|
|
||||||
*/
|
|
||||||
atomic_dec(&tctx->in_cancel);
|
|
||||||
/* for exec all current's requests should be gone, kill tctx */
|
|
||||||
__io_uring_free(current);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static struct io_uring_reg_wait *io_get_ext_arg_reg(struct io_ring_ctx *ctx,
|
static struct io_uring_reg_wait *io_get_ext_arg_reg(struct io_ring_ctx *ctx,
|
||||||
const struct io_uring_getevents_arg __user *uarg)
|
const struct io_uring_getevents_arg __user *uarg)
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -96,6 +96,11 @@ enum {
|
||||||
IOU_REQUEUE = -3072,
|
IOU_REQUEUE = -3072,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct io_defer_entry {
|
||||||
|
struct list_head list;
|
||||||
|
struct io_kiocb *req;
|
||||||
|
};
|
||||||
|
|
||||||
struct io_wait_queue {
|
struct io_wait_queue {
|
||||||
struct wait_queue_entry wq;
|
struct wait_queue_entry wq;
|
||||||
struct io_ring_ctx *ctx;
|
struct io_ring_ctx *ctx;
|
||||||
|
|
@ -134,6 +139,7 @@ unsigned long rings_size(unsigned int flags, unsigned int sq_entries,
|
||||||
int io_uring_fill_params(unsigned entries, struct io_uring_params *p);
|
int io_uring_fill_params(unsigned entries, struct io_uring_params *p);
|
||||||
bool io_cqe_cache_refill(struct io_ring_ctx *ctx, bool overflow, bool cqe32);
|
bool io_cqe_cache_refill(struct io_ring_ctx *ctx, bool overflow, bool cqe32);
|
||||||
int io_run_task_work_sig(struct io_ring_ctx *ctx);
|
int io_run_task_work_sig(struct io_ring_ctx *ctx);
|
||||||
|
int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events);
|
||||||
void io_req_defer_failed(struct io_kiocb *req, s32 res);
|
void io_req_defer_failed(struct io_kiocb *req, s32 res);
|
||||||
bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags);
|
bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags);
|
||||||
void io_add_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags);
|
void io_add_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags);
|
||||||
|
|
@ -141,6 +147,7 @@ bool io_req_post_cqe(struct io_kiocb *req, s32 res, u32 cflags);
|
||||||
bool io_req_post_cqe32(struct io_kiocb *req, struct io_uring_cqe src_cqe[2]);
|
bool io_req_post_cqe32(struct io_kiocb *req, struct io_uring_cqe src_cqe[2]);
|
||||||
void __io_commit_cqring_flush(struct io_ring_ctx *ctx);
|
void __io_commit_cqring_flush(struct io_ring_ctx *ctx);
|
||||||
|
|
||||||
|
unsigned io_linked_nr(struct io_kiocb *req);
|
||||||
void io_req_track_inflight(struct io_kiocb *req);
|
void io_req_track_inflight(struct io_kiocb *req);
|
||||||
struct file *io_file_get_normal(struct io_kiocb *req, int fd);
|
struct file *io_file_get_normal(struct io_kiocb *req, int fd);
|
||||||
struct file *io_file_get_fixed(struct io_kiocb *req, int fd,
|
struct file *io_file_get_fixed(struct io_kiocb *req, int fd,
|
||||||
|
|
@ -155,7 +162,7 @@ void io_req_task_submit(struct io_tw_req tw_req, io_tw_token_t tw);
|
||||||
struct llist_node *io_handle_tw_list(struct llist_node *node, unsigned int *count, unsigned int max_entries);
|
struct llist_node *io_handle_tw_list(struct llist_node *node, unsigned int *count, unsigned int max_entries);
|
||||||
struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count);
|
struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count);
|
||||||
void tctx_task_work(struct callback_head *cb);
|
void tctx_task_work(struct callback_head *cb);
|
||||||
__cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd);
|
__cold void io_uring_drop_tctx_refs(struct task_struct *task);
|
||||||
|
|
||||||
int io_ring_add_registered_file(struct io_uring_task *tctx, struct file *file,
|
int io_ring_add_registered_file(struct io_uring_task *tctx, struct file *file,
|
||||||
int start, int end);
|
int start, int end);
|
||||||
|
|
@ -164,6 +171,7 @@ void io_req_queue_iowq(struct io_kiocb *req);
|
||||||
int io_poll_issue(struct io_kiocb *req, io_tw_token_t tw);
|
int io_poll_issue(struct io_kiocb *req, io_tw_token_t tw);
|
||||||
int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr);
|
int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr);
|
||||||
int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin);
|
int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin);
|
||||||
|
__cold void io_iopoll_try_reap_events(struct io_ring_ctx *ctx);
|
||||||
void __io_submit_flush_completions(struct io_ring_ctx *ctx);
|
void __io_submit_flush_completions(struct io_ring_ctx *ctx);
|
||||||
|
|
||||||
struct io_wq_work *io_wq_free_work(struct io_wq_work *work);
|
struct io_wq_work *io_wq_free_work(struct io_wq_work *work);
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,7 @@
|
||||||
#include "io_uring.h"
|
#include "io_uring.h"
|
||||||
#include "tctx.h"
|
#include "tctx.h"
|
||||||
#include "napi.h"
|
#include "napi.h"
|
||||||
|
#include "cancel.h"
|
||||||
#include "sqpoll.h"
|
#include "sqpoll.h"
|
||||||
|
|
||||||
#define IORING_SQPOLL_CAP_ENTRIES_VALUE 8
|
#define IORING_SQPOLL_CAP_ENTRIES_VALUE 8
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue