Commit 6106c0f8 authored by NeilBrown's avatar NeilBrown Committed by Greg Kroah-Hartman

staging: lustre: lnet: convert selftest to use workqueues

Instead of the cfs workitem library, use workqueues.

As lnet wants to provide a cpu mask of allowed cpus, it
needs to be a WQ_UNBOUND work queue so that tasks can
run on cpus other than where they were submitted.

This patch also exported apply_workqueue_attrs() which is
a documented part of the workqueue API, that isn't currently
exported.  lustre needs it to allow workqueue thread to be limited
to a subset of CPUs.

Acked-by: Tejun Heo <tj@kernel.org> (for export of apply_workqueue_attrs)
Signed-off-by: default avatarNeilBrown <neilb@suse.com>
Signed-off-by: default avatarGreg Kroah-Hartman <gregkh@linuxfoundation.org>
parent a66a4e8e
...@@ -941,15 +941,13 @@ sfw_create_test_rpc(struct sfw_test_unit *tsu, struct lnet_process_id peer, ...@@ -941,15 +941,13 @@ sfw_create_test_rpc(struct sfw_test_unit *tsu, struct lnet_process_id peer,
return 0; return 0;
} }
static int static void
sfw_run_test(struct swi_workitem *wi) sfw_run_test(struct swi_workitem *wi)
{ {
struct sfw_test_unit *tsu = container_of(wi, struct sfw_test_unit, tsu_worker); struct sfw_test_unit *tsu = container_of(wi, struct sfw_test_unit, tsu_worker);
struct sfw_test_instance *tsi = tsu->tsu_instance; struct sfw_test_instance *tsi = tsu->tsu_instance;
struct srpc_client_rpc *rpc = NULL; struct srpc_client_rpc *rpc = NULL;
LASSERT(wi == &tsu->tsu_worker);
if (tsi->tsi_ops->tso_prep_rpc(tsu, tsu->tsu_dest, &rpc)) { if (tsi->tsi_ops->tso_prep_rpc(tsu, tsu->tsu_dest, &rpc)) {
LASSERT(!rpc); LASSERT(!rpc);
goto test_done; goto test_done;
...@@ -975,7 +973,7 @@ sfw_run_test(struct swi_workitem *wi) ...@@ -975,7 +973,7 @@ sfw_run_test(struct swi_workitem *wi)
rpc->crpc_timeout = rpc_timeout; rpc->crpc_timeout = rpc_timeout;
srpc_post_rpc(rpc); srpc_post_rpc(rpc);
spin_unlock(&rpc->crpc_lock); spin_unlock(&rpc->crpc_lock);
return 0; return;
test_done: test_done:
/* /*
...@@ -985,9 +983,7 @@ sfw_run_test(struct swi_workitem *wi) ...@@ -985,9 +983,7 @@ sfw_run_test(struct swi_workitem *wi)
* - my batch is still active; no one can run it again now. * - my batch is still active; no one can run it again now.
* Cancel pending schedules and prevent future schedule attempts: * Cancel pending schedules and prevent future schedule attempts:
*/ */
swi_exit_workitem(wi);
sfw_test_unit_done(tsu); sfw_test_unit_done(tsu);
return 1;
} }
static int static int
...@@ -1017,7 +1013,7 @@ sfw_run_batch(struct sfw_batch *tsb) ...@@ -1017,7 +1013,7 @@ sfw_run_batch(struct sfw_batch *tsb)
tsu->tsu_loop = tsi->tsi_loop; tsu->tsu_loop = tsi->tsi_loop;
wi = &tsu->tsu_worker; wi = &tsu->tsu_worker;
swi_init_workitem(wi, sfw_run_test, swi_init_workitem(wi, sfw_run_test,
lst_sched_test[lnet_cpt_of_nid(tsu->tsu_dest.nid)]); lst_test_wq[lnet_cpt_of_nid(tsu->tsu_dest.nid)]);
swi_schedule_workitem(wi); swi_schedule_workitem(wi);
} }
} }
......
...@@ -47,8 +47,8 @@ enum { ...@@ -47,8 +47,8 @@ enum {
static int lst_init_step = LST_INIT_NONE; static int lst_init_step = LST_INIT_NONE;
struct cfs_wi_sched *lst_sched_serial; struct workqueue_struct *lst_serial_wq;
struct cfs_wi_sched **lst_sched_test; struct workqueue_struct **lst_test_wq;
static void static void
lnet_selftest_exit(void) lnet_selftest_exit(void)
...@@ -68,16 +68,16 @@ lnet_selftest_exit(void) ...@@ -68,16 +68,16 @@ lnet_selftest_exit(void)
case LST_INIT_WI_TEST: case LST_INIT_WI_TEST:
for (i = 0; for (i = 0;
i < cfs_cpt_number(lnet_cpt_table()); i++) { i < cfs_cpt_number(lnet_cpt_table()); i++) {
if (!lst_sched_test[i]) if (!lst_test_wq[i])
continue; continue;
cfs_wi_sched_destroy(lst_sched_test[i]); destroy_workqueue(lst_test_wq[i]);
} }
kvfree(lst_sched_test); kvfree(lst_test_wq);
lst_sched_test = NULL; lst_test_wq = NULL;
/* fall through */ /* fall through */
case LST_INIT_WI_SERIAL: case LST_INIT_WI_SERIAL:
cfs_wi_sched_destroy(lst_sched_serial); destroy_workqueue(lst_serial_wq);
lst_sched_serial = NULL; lst_serial_wq = NULL;
case LST_INIT_NONE: case LST_INIT_NONE:
break; break;
default: default:
...@@ -92,33 +92,40 @@ lnet_selftest_init(void) ...@@ -92,33 +92,40 @@ lnet_selftest_init(void)
int rc; int rc;
int i; int i;
rc = cfs_wi_sched_create("lst_s", lnet_cpt_table(), CFS_CPT_ANY, lst_serial_wq = alloc_ordered_workqueue("lst_s", 0);
1, &lst_sched_serial); if (!lst_serial_wq) {
if (rc) {
CERROR("Failed to create serial WI scheduler for LST\n"); CERROR("Failed to create serial WI scheduler for LST\n");
return rc; return rc;
} }
lst_init_step = LST_INIT_WI_SERIAL; lst_init_step = LST_INIT_WI_SERIAL;
nscheds = cfs_cpt_number(lnet_cpt_table()); nscheds = cfs_cpt_number(lnet_cpt_table());
lst_sched_test = kvmalloc_array(nscheds, sizeof(lst_sched_test[0]), lst_test_wq = kvmalloc_array(nscheds, sizeof(lst_test_wq[0]),
GFP_KERNEL | __GFP_ZERO); GFP_KERNEL | __GFP_ZERO);
if (!lst_sched_test) if (!lst_test_wq)
goto error; goto error;
lst_init_step = LST_INIT_WI_TEST; lst_init_step = LST_INIT_WI_TEST;
for (i = 0; i < nscheds; i++) { for (i = 0; i < nscheds; i++) {
int nthrs = cfs_cpt_weight(lnet_cpt_table(), i); int nthrs = cfs_cpt_weight(lnet_cpt_table(), i);
struct workqueue_attrs attrs;
/* reserve at least one CPU for LND */ /* reserve at least one CPU for LND */
nthrs = max(nthrs - 1, 1); nthrs = max(nthrs - 1, 1);
rc = cfs_wi_sched_create("lst_t", lnet_cpt_table(), i, lst_test_wq[i] = alloc_workqueue("lst_t", WQ_UNBOUND, nthrs);
nthrs, &lst_sched_test[i]); if (!lst_test_wq[i]) {
if (rc) {
CWARN("Failed to create CPU partition affinity WI scheduler %d for LST\n", CWARN("Failed to create CPU partition affinity WI scheduler %d for LST\n",
i); i);
goto error; goto error;
} }
attrs.nice = 0;
#ifdef CONFIG_CPUMASK_OFFSTACK
attrs.cpumask = lnet_cpt_table()->ctb_parts[i].cpt_cpumask;
#else
cpumask_copy(attrs.cpumask, lnet_cpt_table()->ctb_parts[i].cpt_cpumask);
#endif
attrs.no_numa = false;
apply_workqueue_attrs(lst_test_wq[i], &attrs);
} }
rc = srpc_startup(); rc = srpc_startup();
......
...@@ -68,7 +68,7 @@ srpc_serv_portal(int svc_id) ...@@ -68,7 +68,7 @@ srpc_serv_portal(int svc_id)
} }
/* forward ref's */ /* forward ref's */
int srpc_handle_rpc(struct swi_workitem *wi); void srpc_handle_rpc(struct swi_workitem *wi);
void srpc_get_counters(struct srpc_counters *cnt) void srpc_get_counters(struct srpc_counters *cnt)
{ {
...@@ -178,7 +178,7 @@ srpc_init_server_rpc(struct srpc_server_rpc *rpc, ...@@ -178,7 +178,7 @@ srpc_init_server_rpc(struct srpc_server_rpc *rpc,
memset(rpc, 0, sizeof(*rpc)); memset(rpc, 0, sizeof(*rpc));
swi_init_workitem(&rpc->srpc_wi, srpc_handle_rpc, swi_init_workitem(&rpc->srpc_wi, srpc_handle_rpc,
srpc_serv_is_framework(scd->scd_svc) ? srpc_serv_is_framework(scd->scd_svc) ?
lst_sched_serial : lst_sched_test[scd->scd_cpt]); lst_serial_wq : lst_test_wq[scd->scd_cpt]);
rpc->srpc_ev.ev_fired = 1; /* no event expected now */ rpc->srpc_ev.ev_fired = 1; /* no event expected now */
...@@ -242,7 +242,7 @@ srpc_service_nrpcs(struct srpc_service *svc) ...@@ -242,7 +242,7 @@ srpc_service_nrpcs(struct srpc_service *svc)
max(nrpcs, SFW_FRWK_WI_MIN) : max(nrpcs, SFW_TEST_WI_MIN); max(nrpcs, SFW_FRWK_WI_MIN) : max(nrpcs, SFW_TEST_WI_MIN);
} }
int srpc_add_buffer(struct swi_workitem *wi); void srpc_add_buffer(struct swi_workitem *wi);
static int static int
srpc_service_init(struct srpc_service *svc) srpc_service_init(struct srpc_service *svc)
...@@ -277,11 +277,11 @@ srpc_service_init(struct srpc_service *svc) ...@@ -277,11 +277,11 @@ srpc_service_init(struct srpc_service *svc)
scd->scd_ev.ev_type = SRPC_REQUEST_RCVD; scd->scd_ev.ev_type = SRPC_REQUEST_RCVD;
/* /*
* NB: don't use lst_sched_serial for adding buffer, * NB: don't use lst_serial_wq for adding buffer,
* see details in srpc_service_add_buffers() * see details in srpc_service_add_buffers()
*/ */
swi_init_workitem(&scd->scd_buf_wi, swi_init_workitem(&scd->scd_buf_wi,
srpc_add_buffer, lst_sched_test[i]); srpc_add_buffer, lst_test_wq[i]);
if (i && srpc_serv_is_framework(svc)) { if (i && srpc_serv_is_framework(svc)) {
/* /*
...@@ -513,7 +513,7 @@ __must_hold(&scd->scd_lock) ...@@ -513,7 +513,7 @@ __must_hold(&scd->scd_lock)
return rc; return rc;
} }
int void
srpc_add_buffer(struct swi_workitem *wi) srpc_add_buffer(struct swi_workitem *wi)
{ {
struct srpc_service_cd *scd = container_of(wi, struct srpc_service_cd, scd_buf_wi); struct srpc_service_cd *scd = container_of(wi, struct srpc_service_cd, scd_buf_wi);
...@@ -572,7 +572,6 @@ srpc_add_buffer(struct swi_workitem *wi) ...@@ -572,7 +572,6 @@ srpc_add_buffer(struct swi_workitem *wi)
} }
spin_unlock(&scd->scd_lock); spin_unlock(&scd->scd_lock);
return 0;
} }
int int
...@@ -604,15 +603,15 @@ srpc_service_add_buffers(struct srpc_service *sv, int nbuffer) ...@@ -604,15 +603,15 @@ srpc_service_add_buffers(struct srpc_service *sv, int nbuffer)
spin_lock(&scd->scd_lock); spin_lock(&scd->scd_lock);
/* /*
* NB: srpc_service_add_buffers() can be called inside * NB: srpc_service_add_buffers() can be called inside
* thread context of lst_sched_serial, and we don't normally * thread context of lst_serial_wq, and we don't normally
* allow to sleep inside thread context of WI scheduler * allow to sleep inside thread context of WI scheduler
* because it will block current scheduler thread from doing * because it will block current scheduler thread from doing
* anything else, even worse, it could deadlock if it's * anything else, even worse, it could deadlock if it's
* waiting on result from another WI of the same scheduler. * waiting on result from another WI of the same scheduler.
* However, it's safe at here because scd_buf_wi is scheduled * However, it's safe at here because scd_buf_wi is scheduled
* by thread in a different WI scheduler (lst_sched_test), * by thread in a different WI scheduler (lst_test_wq),
* so we don't have any risk of deadlock, though this could * so we don't have any risk of deadlock, though this could
* block all WIs pending on lst_sched_serial for a moment * block all WIs pending on lst_serial_wq for a moment
* which is not good but not fatal. * which is not good but not fatal.
*/ */
lst_wait_until(scd->scd_buf_err || lst_wait_until(scd->scd_buf_err ||
...@@ -659,11 +658,9 @@ srpc_finish_service(struct srpc_service *sv) ...@@ -659,11 +658,9 @@ srpc_finish_service(struct srpc_service *sv)
LASSERT(sv->sv_shuttingdown); /* srpc_shutdown_service called */ LASSERT(sv->sv_shuttingdown); /* srpc_shutdown_service called */
cfs_percpt_for_each(scd, i, sv->sv_cpt_data) { cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
swi_cancel_workitem(&scd->scd_buf_wi);
spin_lock(&scd->scd_lock); spin_lock(&scd->scd_lock);
if (!swi_deschedule_workitem(&scd->scd_buf_wi)) {
spin_unlock(&scd->scd_lock);
return 0;
}
if (scd->scd_buf_nposted > 0) { if (scd->scd_buf_nposted > 0) {
CDEBUG(D_NET, "waiting for %d posted buffers to unlink\n", CDEBUG(D_NET, "waiting for %d posted buffers to unlink\n",
...@@ -679,11 +676,9 @@ srpc_finish_service(struct srpc_service *sv) ...@@ -679,11 +676,9 @@ srpc_finish_service(struct srpc_service *sv)
rpc = list_entry(scd->scd_rpc_active.next, rpc = list_entry(scd->scd_rpc_active.next,
struct srpc_server_rpc, srpc_list); struct srpc_server_rpc, srpc_list);
CNETERR("Active RPC %p on shutdown: sv %s, peer %s, wi %s scheduled %d running %d, ev fired %d type %d status %d lnet %d\n", CNETERR("Active RPC %p on shutdown: sv %s, peer %s, wi %s, ev fired %d type %d status %d lnet %d\n",
rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer), rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
swi_state2str(rpc->srpc_wi.swi_state), swi_state2str(rpc->srpc_wi.swi_state),
rpc->srpc_wi.swi_workitem.wi_scheduled,
rpc->srpc_wi.swi_workitem.wi_running,
rpc->srpc_ev.ev_fired, rpc->srpc_ev.ev_type, rpc->srpc_ev.ev_fired, rpc->srpc_ev.ev_type,
rpc->srpc_ev.ev_status, rpc->srpc_ev.ev_lnet); rpc->srpc_ev.ev_status, rpc->srpc_ev.ev_lnet);
spin_unlock(&scd->scd_lock); spin_unlock(&scd->scd_lock);
...@@ -946,7 +941,6 @@ srpc_server_rpc_done(struct srpc_server_rpc *rpc, int status) ...@@ -946,7 +941,6 @@ srpc_server_rpc_done(struct srpc_server_rpc *rpc, int status)
* Cancel pending schedules and prevent future schedule attempts: * Cancel pending schedules and prevent future schedule attempts:
*/ */
LASSERT(rpc->srpc_ev.ev_fired); LASSERT(rpc->srpc_ev.ev_fired);
swi_exit_workitem(&rpc->srpc_wi);
if (!sv->sv_shuttingdown && !list_empty(&scd->scd_buf_blocked)) { if (!sv->sv_shuttingdown && !list_empty(&scd->scd_buf_blocked)) {
buffer = list_entry(scd->scd_buf_blocked.next, buffer = list_entry(scd->scd_buf_blocked.next,
...@@ -964,7 +958,7 @@ srpc_server_rpc_done(struct srpc_server_rpc *rpc, int status) ...@@ -964,7 +958,7 @@ srpc_server_rpc_done(struct srpc_server_rpc *rpc, int status)
} }
/* handles an incoming RPC */ /* handles an incoming RPC */
int void
srpc_handle_rpc(struct swi_workitem *wi) srpc_handle_rpc(struct swi_workitem *wi)
{ {
struct srpc_server_rpc *rpc = container_of(wi, struct srpc_server_rpc, srpc_wi); struct srpc_server_rpc *rpc = container_of(wi, struct srpc_server_rpc, srpc_wi);
...@@ -986,9 +980,8 @@ srpc_handle_rpc(struct swi_workitem *wi) ...@@ -986,9 +980,8 @@ srpc_handle_rpc(struct swi_workitem *wi)
if (ev->ev_fired) { /* no more event, OK to finish */ if (ev->ev_fired) { /* no more event, OK to finish */
srpc_server_rpc_done(rpc, -ESHUTDOWN); srpc_server_rpc_done(rpc, -ESHUTDOWN);
return 1;
} }
return 0; return;
} }
spin_unlock(&scd->scd_lock); spin_unlock(&scd->scd_lock);
...@@ -1006,7 +999,7 @@ srpc_handle_rpc(struct swi_workitem *wi) ...@@ -1006,7 +999,7 @@ srpc_handle_rpc(struct swi_workitem *wi)
if (!msg->msg_magic) { if (!msg->msg_magic) {
/* moaned already in srpc_lnet_ev_handler */ /* moaned already in srpc_lnet_ev_handler */
srpc_server_rpc_done(rpc, EBADMSG); srpc_server_rpc_done(rpc, EBADMSG);
return 1; return;
} }
srpc_unpack_msg_hdr(msg); srpc_unpack_msg_hdr(msg);
...@@ -1022,7 +1015,7 @@ srpc_handle_rpc(struct swi_workitem *wi) ...@@ -1022,7 +1015,7 @@ srpc_handle_rpc(struct swi_workitem *wi)
LASSERT(!reply->status || !rpc->srpc_bulk); LASSERT(!reply->status || !rpc->srpc_bulk);
if (rc) { if (rc) {
srpc_server_rpc_done(rpc, rc); srpc_server_rpc_done(rpc, rc);
return 1; return;
} }
} }
...@@ -1031,7 +1024,7 @@ srpc_handle_rpc(struct swi_workitem *wi) ...@@ -1031,7 +1024,7 @@ srpc_handle_rpc(struct swi_workitem *wi)
if (rpc->srpc_bulk) { if (rpc->srpc_bulk) {
rc = srpc_do_bulk(rpc); rc = srpc_do_bulk(rpc);
if (!rc) if (!rc)
return 0; /* wait for bulk */ return; /* wait for bulk */
LASSERT(ev->ev_fired); LASSERT(ev->ev_fired);
ev->ev_status = rc; ev->ev_status = rc;
...@@ -1049,16 +1042,16 @@ srpc_handle_rpc(struct swi_workitem *wi) ...@@ -1049,16 +1042,16 @@ srpc_handle_rpc(struct swi_workitem *wi)
if (rc) { if (rc) {
srpc_server_rpc_done(rpc, rc); srpc_server_rpc_done(rpc, rc);
return 1; return;
} }
} }
wi->swi_state = SWI_STATE_REPLY_SUBMITTED; wi->swi_state = SWI_STATE_REPLY_SUBMITTED;
rc = srpc_send_reply(rpc); rc = srpc_send_reply(rpc);
if (!rc) if (!rc)
return 0; /* wait for reply */ return; /* wait for reply */
srpc_server_rpc_done(rpc, rc); srpc_server_rpc_done(rpc, rc);
return 1; return;
case SWI_STATE_REPLY_SUBMITTED: case SWI_STATE_REPLY_SUBMITTED:
if (!ev->ev_fired) { if (!ev->ev_fired) {
...@@ -1071,10 +1064,8 @@ srpc_handle_rpc(struct swi_workitem *wi) ...@@ -1071,10 +1064,8 @@ srpc_handle_rpc(struct swi_workitem *wi)
wi->swi_state = SWI_STATE_DONE; wi->swi_state = SWI_STATE_DONE;
srpc_server_rpc_done(rpc, ev->ev_status); srpc_server_rpc_done(rpc, ev->ev_status);
return 1; return;
} }
return 0;
} }
static void static void
...@@ -1169,7 +1160,6 @@ srpc_client_rpc_done(struct srpc_client_rpc *rpc, int status) ...@@ -1169,7 +1160,6 @@ srpc_client_rpc_done(struct srpc_client_rpc *rpc, int status)
* Cancel pending schedules and prevent future schedule attempts: * Cancel pending schedules and prevent future schedule attempts:
*/ */
LASSERT(!srpc_event_pending(rpc)); LASSERT(!srpc_event_pending(rpc));
swi_exit_workitem(wi);
spin_unlock(&rpc->crpc_lock); spin_unlock(&rpc->crpc_lock);
...@@ -1177,7 +1167,7 @@ srpc_client_rpc_done(struct srpc_client_rpc *rpc, int status) ...@@ -1177,7 +1167,7 @@ srpc_client_rpc_done(struct srpc_client_rpc *rpc, int status)
} }
/* sends an outgoing RPC */ /* sends an outgoing RPC */
int void
srpc_send_rpc(struct swi_workitem *wi) srpc_send_rpc(struct swi_workitem *wi)
{ {
int rc = 0; int rc = 0;
...@@ -1213,7 +1203,7 @@ srpc_send_rpc(struct swi_workitem *wi) ...@@ -1213,7 +1203,7 @@ srpc_send_rpc(struct swi_workitem *wi)
rc = srpc_prepare_reply(rpc); rc = srpc_prepare_reply(rpc);
if (rc) { if (rc) {
srpc_client_rpc_done(rpc, rc); srpc_client_rpc_done(rpc, rc);
return 1; return;
} }
rc = srpc_prepare_bulk(rpc); rc = srpc_prepare_bulk(rpc);
...@@ -1290,7 +1280,7 @@ srpc_send_rpc(struct swi_workitem *wi) ...@@ -1290,7 +1280,7 @@ srpc_send_rpc(struct swi_workitem *wi)
wi->swi_state = SWI_STATE_DONE; wi->swi_state = SWI_STATE_DONE;
srpc_client_rpc_done(rpc, rc); srpc_client_rpc_done(rpc, rc);
return 1; return;
} }
if (rc) { if (rc) {
...@@ -1307,10 +1297,9 @@ srpc_send_rpc(struct swi_workitem *wi) ...@@ -1307,10 +1297,9 @@ srpc_send_rpc(struct swi_workitem *wi)
if (!srpc_event_pending(rpc)) { if (!srpc_event_pending(rpc)) {
srpc_client_rpc_done(rpc, -EINTR); srpc_client_rpc_done(rpc, -EINTR);
return 1; return;
} }
} }
return 0;
} }
struct srpc_client_rpc * struct srpc_client_rpc *
......
...@@ -169,11 +169,11 @@ struct srpc_buffer { ...@@ -169,11 +169,11 @@ struct srpc_buffer {
}; };
struct swi_workitem; struct swi_workitem;
typedef int (*swi_action_t) (struct swi_workitem *); typedef void (*swi_action_t) (struct swi_workitem *);
struct swi_workitem { struct swi_workitem {
struct cfs_wi_sched *swi_sched; struct workqueue_struct *swi_wq;
struct cfs_workitem swi_workitem; struct work_struct swi_work;
swi_action_t swi_action; swi_action_t swi_action;
int swi_state; int swi_state;
}; };
...@@ -444,7 +444,7 @@ void srpc_free_bulk(struct srpc_bulk *bk); ...@@ -444,7 +444,7 @@ void srpc_free_bulk(struct srpc_bulk *bk);
struct srpc_bulk *srpc_alloc_bulk(int cpt, unsigned int off, struct srpc_bulk *srpc_alloc_bulk(int cpt, unsigned int off,
unsigned int bulk_npg, unsigned int bulk_len, unsigned int bulk_npg, unsigned int bulk_len,
int sink); int sink);
int srpc_send_rpc(struct swi_workitem *wi); void srpc_send_rpc(struct swi_workitem *wi);
int srpc_send_reply(struct srpc_server_rpc *rpc); int srpc_send_reply(struct srpc_server_rpc *rpc);
int srpc_add_service(struct srpc_service *sv); int srpc_add_service(struct srpc_service *sv);
int srpc_remove_service(struct srpc_service *sv); int srpc_remove_service(struct srpc_service *sv);
...@@ -456,8 +456,8 @@ void srpc_service_remove_buffers(struct srpc_service *sv, int nbuffer); ...@@ -456,8 +456,8 @@ void srpc_service_remove_buffers(struct srpc_service *sv, int nbuffer);
void srpc_get_counters(struct srpc_counters *cnt); void srpc_get_counters(struct srpc_counters *cnt);
void srpc_set_counters(const struct srpc_counters *cnt); void srpc_set_counters(const struct srpc_counters *cnt);
extern struct cfs_wi_sched *lst_sched_serial; extern struct workqueue_struct *lst_serial_wq;
extern struct cfs_wi_sched **lst_sched_test; extern struct workqueue_struct **lst_test_wq;
static inline int static inline int
srpc_serv_is_framework(struct srpc_service *svc) srpc_serv_is_framework(struct srpc_service *svc)
...@@ -465,42 +465,36 @@ srpc_serv_is_framework(struct srpc_service *svc) ...@@ -465,42 +465,36 @@ srpc_serv_is_framework(struct srpc_service *svc)
return svc->sv_id < SRPC_FRAMEWORK_SERVICE_MAX_ID; return svc->sv_id < SRPC_FRAMEWORK_SERVICE_MAX_ID;
} }
static inline int static void
swi_wi_action(struct cfs_workitem *wi) swi_wi_action(struct work_struct *wi)
{ {
struct swi_workitem *swi; struct swi_workitem *swi;
swi = container_of(wi, struct swi_workitem, swi_workitem); swi = container_of(wi, struct swi_workitem, swi_work);
return swi->swi_action(swi); swi->swi_action(swi);
} }
static inline void static inline void
swi_init_workitem(struct swi_workitem *swi, swi_init_workitem(struct swi_workitem *swi,
swi_action_t action, struct cfs_wi_sched *sched) swi_action_t action, struct workqueue_struct *wq)
{ {
swi->swi_sched = sched; swi->swi_wq = wq;
swi->swi_action = action; swi->swi_action = action;
swi->swi_state = SWI_STATE_NEWBORN; swi->swi_state = SWI_STATE_NEWBORN;
cfs_wi_init(&swi->swi_workitem, swi_wi_action); INIT_WORK(&swi->swi_work, swi_wi_action);
} }
static inline void static inline void
swi_schedule_workitem(struct swi_workitem *wi) swi_schedule_workitem(struct swi_workitem *wi)
{ {
cfs_wi_schedule(wi->swi_sched, &wi->swi_workitem); queue_work(wi->swi_wq, &wi->swi_work);
}
static inline void
swi_exit_workitem(struct swi_workitem *swi)
{
cfs_wi_exit(swi->swi_sched, &swi->swi_workitem);
} }
static inline int static inline int
swi_deschedule_workitem(struct swi_workitem *swi) swi_cancel_workitem(struct swi_workitem *swi)
{ {
return cfs_wi_deschedule(swi->swi_sched, &swi->swi_workitem); return cancel_work_sync(&swi->swi_work);
} }
int sfw_startup(void); int sfw_startup(void);
...@@ -534,7 +528,7 @@ srpc_init_client_rpc(struct srpc_client_rpc *rpc, struct lnet_process_id peer, ...@@ -534,7 +528,7 @@ srpc_init_client_rpc(struct srpc_client_rpc *rpc, struct lnet_process_id peer,
INIT_LIST_HEAD(&rpc->crpc_list); INIT_LIST_HEAD(&rpc->crpc_list);
swi_init_workitem(&rpc->crpc_wi, srpc_send_rpc, swi_init_workitem(&rpc->crpc_wi, srpc_send_rpc,
lst_sched_test[lnet_cpt_of_nid(peer.nid)]); lst_test_wq[lnet_cpt_of_nid(peer.nid)]);
spin_lock_init(&rpc->crpc_lock); spin_lock_init(&rpc->crpc_lock);
atomic_set(&rpc->crpc_refcount, 1); /* 1 ref for caller */ atomic_set(&rpc->crpc_refcount, 1); /* 1 ref for caller */
......
...@@ -3806,6 +3806,7 @@ int apply_workqueue_attrs(struct workqueue_struct *wq, ...@@ -3806,6 +3806,7 @@ int apply_workqueue_attrs(struct workqueue_struct *wq,
return ret; return ret;
} }
EXPORT_SYMBOL_GPL(apply_workqueue_attrs);
/** /**
* wq_update_unbound_numa - update NUMA affinity of a wq for CPU hot[un]plug * wq_update_unbound_numa - update NUMA affinity of a wq for CPU hot[un]plug
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment