Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce GRO support to cpumap codebase #8149

Open
wants to merge 3 commits into
base: bpf-next_base
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions include/linux/netdevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -2677,6 +2677,9 @@ static inline void netif_napi_set_irq(struct napi_struct *napi, int irq)
*/
#define NAPI_POLL_WEIGHT 64

int napi_threaded_poll(void *data);
int napi_init_for_gro(struct net_device *dev, struct napi_struct *napi,
int (*poll)(struct napi_struct *, int), int weight);
void netif_napi_add_weight(struct net_device *dev, struct napi_struct *napi,
int (*poll)(struct napi_struct *, int), int weight);

Expand Down
125 changes: 52 additions & 73 deletions kernel/bpf/cpumap.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@ struct bpf_cpu_map_entry {
/* XDP can run multiple RX-ring queues, need __percpu enqueue store */
struct xdp_bulk_queue __percpu *bulkq;

/* Queue with potential multi-producers, and single-consumer kthread */
/* Queue with potential multi-producers, and single-consumer
* NAPI-kthread
*/
struct ptr_ring *queue;
struct task_struct *kthread;
struct napi_struct napi;

struct bpf_cpumap_val value;
struct bpf_prog *prog;
Expand Down Expand Up @@ -261,58 +263,42 @@ static int cpu_map_bpf_prog_run(struct bpf_cpu_map_entry *rcpu, void **frames,
return nframes;
}

static int cpu_map_kthread_run(void *data)
static int cpu_map_poll(struct napi_struct *napi, int budget)
{
struct bpf_cpu_map_entry *rcpu = data;
unsigned long last_qs = jiffies;
struct xdp_cpumap_stats stats = {}; /* zero stats */
unsigned int kmem_alloc_drops = 0;
struct bpf_cpu_map_entry *rcpu;
int done = 0;

rcu_read_lock();
rcpu = container_of(napi, struct bpf_cpu_map_entry, napi);
complete(&rcpu->kthread_running);
set_current_state(TASK_INTERRUPTIBLE);

/* When kthread gives stop order, then rcpu have been disconnected
* from map, thus no new packets can enter. Remaining in-flight
* per CPU stored packets are flushed to this queue. Wait honoring
* kthread_stop signal until queue is empty.
*/
while (!kthread_should_stop() || !__ptr_ring_empty(rcpu->queue)) {
struct xdp_cpumap_stats stats = {}; /* zero stats */
unsigned int kmem_alloc_drops = 0, sched = 0;
while (done < budget) {
gfp_t gfp = __GFP_ZERO | GFP_ATOMIC;
int i, n, m, nframes, xdp_n;
int n, i, m, xdp_n = 0, nframes;
void *frames[CPUMAP_BATCH];
struct sk_buff *skb, *tmp;
void *skbs[CPUMAP_BATCH];
LIST_HEAD(list);

/* Release CPU reschedule checks */
if (__ptr_ring_empty(rcpu->queue)) {
set_current_state(TASK_INTERRUPTIBLE);
/* Recheck to avoid lost wake-up */
if (__ptr_ring_empty(rcpu->queue)) {
schedule();
sched = 1;
last_qs = jiffies;
} else {
__set_current_state(TASK_RUNNING);
}
} else {
rcu_softirq_qs_periodic(last_qs);
sched = cond_resched();
}

if (__ptr_ring_empty(rcpu->queue))
break;
/*
* The bpf_cpu_map_entry is single consumer, with this
* kthread CPU pinned. Lockless access to ptr_ring
* consume side valid as no-resize allowed of queue.
*/
n = __ptr_ring_consume_batched(rcpu->queue, frames,
CPUMAP_BATCH);
for (i = 0, xdp_n = 0; i < n; i++) {
n = min(budget - done, CPUMAP_BATCH);
n = __ptr_ring_consume_batched(rcpu->queue, frames, n);
done += n;

for (i = 0; i < n; i++) {
void *f = frames[i];
struct page *page;

if (unlikely(__ptr_test_bit(0, &f))) {
struct sk_buff *skb = f;

skb = f;
__ptr_clear_bit(0, &skb);
list_add_tail(&skb->list, &list);
continue;
Expand Down Expand Up @@ -340,12 +326,10 @@ static int cpu_map_kthread_run(void *data)
}
}

local_bh_disable();
for (i = 0; i < nframes; i++) {
struct xdp_frame *xdpf = frames[i];
struct sk_buff *skb = skbs[i];

skb = __xdp_build_skb_from_frame(xdpf, skb,
skb = __xdp_build_skb_from_frame(xdpf, skbs[i],
xdpf->dev_rx);
if (!skb) {
xdp_return_frame(xdpf);
Expand All @@ -355,18 +339,20 @@ static int cpu_map_kthread_run(void *data)
list_add_tail(&skb->list, &list);
}

/* Feedback loop via tracepoint.
* NB: keep before recv to allow measuring enqueue/dequeue latency.
*/
trace_xdp_cpumap_kthread(rcpu->map_id, n, kmem_alloc_drops,
sched, &stats);

netif_receive_skb_list(&list);
local_bh_enable(); /* resched point, may call do_softirq() */
list_for_each_entry_safe(skb, tmp, &list, list) {
skb_list_del_init(skb);
napi_gro_receive(napi, skb);
}
}
__set_current_state(TASK_RUNNING);

return 0;
rcu_read_unlock();
/* Feedback loop via tracepoint */
trace_xdp_cpumap_kthread(rcpu->map_id, done, kmem_alloc_drops, 0,
&stats);
if (done < budget)
napi_complete(napi);

return done;
}

static int __cpu_map_load_bpf_program(struct bpf_cpu_map_entry *rcpu,
Expand Down Expand Up @@ -434,18 +420,19 @@ __cpu_map_entry_alloc(struct bpf_map *map, struct bpf_cpumap_val *value,
if (fd > 0 && __cpu_map_load_bpf_program(rcpu, map, fd))
goto free_ptr_ring;

napi_init_for_gro(NULL, &rcpu->napi, cpu_map_poll,
NAPI_POLL_WEIGHT);
set_bit(NAPI_STATE_THREADED, &rcpu->napi.state);

/* Setup kthread */
init_completion(&rcpu->kthread_running);
rcpu->kthread = kthread_create_on_node(cpu_map_kthread_run, rcpu, numa,
"cpumap/%d/map:%d", cpu,
map->id);
if (IS_ERR(rcpu->kthread))
rcpu->napi.thread = kthread_run_on_cpu(napi_threaded_poll,
&rcpu->napi, cpu,
"cpumap-napi/%d");
if (IS_ERR(rcpu->napi.thread))
goto free_prog;

/* Make sure kthread runs on a single CPU */
kthread_bind(rcpu->kthread, cpu);
wake_up_process(rcpu->kthread);

napi_schedule(&rcpu->napi);
/* Make sure kthread has been running, so kthread_stop() will not
* stop the kthread prematurely and all pending frames or skbs
* will be handled by the kthread before kthread_stop() returns.
Expand Down Expand Up @@ -479,12 +466,8 @@ static void __cpu_map_entry_free(struct work_struct *work)
*/
rcpu = container_of(to_rcu_work(work), struct bpf_cpu_map_entry, free_work);

/* kthread_stop will wake_up_process and wait for it to complete.
* cpu_map_kthread_run() makes sure the pointer ring is empty
* before exiting.
*/
kthread_stop(rcpu->kthread);

napi_disable(&rcpu->napi);
__netif_napi_del(&rcpu->napi);
if (rcpu->prog)
bpf_prog_put(rcpu->prog);
/* The queue should be empty at this point */
Expand All @@ -500,8 +483,8 @@ static void __cpu_map_entry_free(struct work_struct *work)
* __cpu_map_entry_free() in a separate workqueue after waiting for an RCU grace
* period. This means that (a) all pending enqueue and flush operations have
* completed (because of the RCU callback), and (b) we are in a workqueue
* context where we can stop the kthread and wait for it to exit before freeing
* everything.
* context where we can stop the NAPI-kthread and wait for it to exit before
* freeing everything.
*/
static void __cpu_map_entry_replace(struct bpf_cpu_map *cmap,
u32 key_cpu, struct bpf_cpu_map_entry *rcpu)
Expand Down Expand Up @@ -581,17 +564,15 @@ static void cpu_map_free(struct bpf_map *map)
*/
synchronize_rcu();

/* The only possible user of bpf_cpu_map_entry is
* cpu_map_kthread_run().
*/
/* The only possible user of bpf_cpu_map_entry is the NAPI-kthread. */
for (i = 0; i < cmap->map.max_entries; i++) {
struct bpf_cpu_map_entry *rcpu;

rcpu = rcu_dereference_raw(cmap->cpu_map[i]);
if (!rcpu)
continue;

/* Stop kthread and cleanup entry directly */
/* Stop NAPI-kthread and cleanup entry directly */
__cpu_map_entry_free(&rcpu->free_work.work);
}
bpf_map_area_free(cmap->cpu_map);
Expand Down Expand Up @@ -755,7 +736,7 @@ int cpu_map_generic_redirect(struct bpf_cpu_map_entry *rcpu,
if (ret < 0)
goto trace;

wake_up_process(rcpu->kthread);
napi_schedule(&rcpu->napi);
trace:
trace_xdp_cpumap_enqueue(rcpu->map_id, !ret, !!ret, rcpu->cpu);
return ret;
Expand All @@ -767,8 +748,6 @@ void __cpu_map_flush(struct list_head *flush_list)

list_for_each_entry_safe(bq, tmp, flush_list, flush_node) {
bq_flush_to_queue(bq);

/* If already running, costs spin_lock_irqsave + smb_mb */
wake_up_process(bq->obj->kthread);
napi_schedule(&bq->obj->napi);
}
}
21 changes: 15 additions & 6 deletions net/core/dev.c
Original file line number Diff line number Diff line change
Expand Up @@ -1419,8 +1419,6 @@ void netdev_notify_peers(struct net_device *dev)
}
EXPORT_SYMBOL(netdev_notify_peers);

static int napi_threaded_poll(void *data);

static int napi_kthread_create(struct napi_struct *n)
{
int err = 0;
Expand Down Expand Up @@ -6723,13 +6721,14 @@ static void napi_save_config(struct napi_struct *n)
napi_hash_del(n);
}

void netif_napi_add_weight(struct net_device *dev, struct napi_struct *napi,
int (*poll)(struct napi_struct *, int), int weight)
int napi_init_for_gro(struct net_device *dev, struct napi_struct *napi,
int (*poll)(struct napi_struct *, int), int weight)
{
if (WARN_ON(test_and_set_bit(NAPI_STATE_LISTED, &napi->state)))
return;
return -EBUSY;

INIT_LIST_HEAD(&napi->poll_list);
INIT_LIST_HEAD(&napi->dev_list);
INIT_HLIST_NODE(&napi->napi_hash_node);
hrtimer_init(&napi->timer, CLOCK_MONOTONIC, HRTIMER_MODE_REL_PINNED);
napi->timer.function = napi_watchdog;
Expand All @@ -6747,6 +6746,16 @@ void netif_napi_add_weight(struct net_device *dev, struct napi_struct *napi,
napi->poll_owner = -1;
#endif
napi->list_owner = -1;

return 0;
}

void netif_napi_add_weight(struct net_device *dev, struct napi_struct *napi,
int (*poll)(struct napi_struct *, int), int weight)
{
if (napi_init_for_gro(dev, napi, poll, weight))
return;

set_bit(NAPI_STATE_SCHED, &napi->state);
set_bit(NAPI_STATE_NPSVC, &napi->state);
list_add_rcu(&napi->dev_list, &dev->napi_list);
Expand Down Expand Up @@ -7016,7 +7025,7 @@ static void napi_threaded_poll_loop(struct napi_struct *napi)
}
}

static int napi_threaded_poll(void *data)
int napi_threaded_poll(void *data)
{
struct napi_struct *napi = data;

Expand Down
Loading