diff options
author | Jason A. Donenfeld <Jason@zx2c4.com> | 2017-06-07 01:39:08 -0500 |
---|---|---|
committer | Jason A. Donenfeld <Jason@zx2c4.com> | 2017-09-18 17:38:16 +0200 |
commit | 0d341761c44739f9c53128fd3e101f83fe60b969 (patch) | |
tree | 1ce7852d663aa916295c4b1b369f3c1ee8bca1f1 /src/send.c | |
parent | 9ffe12e8d9742baf02b08236ed5c4b0de807434a (diff) |
queue: entirely rework parallel system
This removes our dependency on padata and moves to a different mode of
multiprocessing that is more efficient.
This began as Samuel Holland's GSoC project and was gradually
reworked/redesigned/rebased into this present commit, which is a
combination of his initial contribution and my subsequent rewriting and
redesigning.
Signed-off-by: Jason A. Donenfeld <Jason@zx2c4.com>
Diffstat (limited to 'src/send.c')
-rw-r--r-- | src/send.c | 234 |
1 files changed, 174 insertions, 60 deletions
@@ -1,6 +1,6 @@ /* Copyright (C) 2015-2017 Jason A. Donenfeld <Jason@zx2c4.com>. All Rights Reserved. */ -#include "packets.h" +#include "queueing.h" #include "timers.h" #include "device.h" #include "peer.h" @@ -12,6 +12,7 @@ #include <linux/inetdevice.h> #include <linux/socket.h> #include <linux/jiffies.h> +#include <net/ip_tunnels.h> #include <net/udp.h> #include <net/sock.h> @@ -37,14 +38,14 @@ static void packet_send_handshake_initiation(struct wireguard_peer *peer) } } -void packet_send_queued_handshakes(struct work_struct *work) +void packet_handshake_send_worker(struct work_struct *work) { struct wireguard_peer *peer = container_of(work, struct wireguard_peer, transmit_handshake_work); packet_send_handshake_initiation(peer); peer_put(peer); } -void packet_queue_handshake_initiation(struct wireguard_peer *peer, bool is_retry) +void packet_send_queued_handshake_initiation(struct wireguard_peer *peer, bool is_retry) { if (!is_retry) peer->timer_handshake_attempts = 0; @@ -56,7 +57,7 @@ void packet_queue_handshake_initiation(struct wireguard_peer *peer, bool is_retr peer = peer_rcu_get(peer); /* Queues up calling packet_send_queued_handshakes(peer), where we do a peer_put(peer) after: */ - if (!queue_work(peer->device->peer_wq, &peer->transmit_handshake_work)) + if (!queue_work(peer->device->handshake_send_wq, &peer->transmit_handshake_work)) peer_put(peer); /* If the work was already queued, we want to drop the extra reference */ } @@ -100,25 +101,70 @@ static inline void keep_key_fresh(struct wireguard_peer *peer) rcu_read_unlock_bh(); if (send) - packet_queue_handshake_initiation(peer, false); + packet_send_queued_handshake_initiation(peer, false); +} + +static inline bool skb_encrypt(struct sk_buff *skb, struct noise_keypair *keypair, bool have_simd) +{ + struct scatterlist sg[MAX_SKB_FRAGS * 2 + 1]; + struct message_data *header; + unsigned int padding_len, plaintext_len, trailer_len; + int num_frags; + struct sk_buff *trailer; + + /* Calculate lengths */ + padding_len = skb_padding(skb); + trailer_len = padding_len + noise_encrypted_len(0); + plaintext_len = skb->len + padding_len; + + /* Expand data section to have room for padding and auth tag */ + num_frags = skb_cow_data(skb, trailer_len, &trailer); + if (unlikely(num_frags < 0 || num_frags > ARRAY_SIZE(sg))) + return false; + + /* Set the padding to zeros, and make sure it and the auth tag are part of the skb */ + memset(skb_tail_pointer(trailer), 0, padding_len); + + /* Expand head section to have room for our header and the network stack's headers. */ + if (unlikely(skb_cow_head(skb, DATA_PACKET_HEAD_ROOM) < 0)) + return false; + + /* We have to remember to add the checksum to the innerpacket, in case the receiver forwards it. */ + if (likely(!skb_checksum_setup(skb, true))) + skb_checksum_help(skb); + + /* Only after checksumming can we safely add on the padding at the end and the header. */ + header = (struct message_data *)skb_push(skb, sizeof(struct message_data)); + header->header.type = cpu_to_le32(MESSAGE_DATA); + header->key_idx = keypair->remote_index; + header->counter = cpu_to_le64(PACKET_CB(skb)->nonce); + pskb_put(skb, trailer, trailer_len); + + /* Now we can encrypt the scattergather segments */ + sg_init_table(sg, num_frags); + if (skb_to_sgvec(skb, sg, sizeof(struct message_data), noise_encrypted_len(plaintext_len)) <= 0) + return false; + return chacha20poly1305_encrypt_sg(sg, sg, plaintext_len, NULL, 0, PACKET_CB(skb)->nonce, keypair->sending.key, have_simd); } void packet_send_keepalive(struct wireguard_peer *peer) { struct sk_buff *skb; - if (skb_queue_empty(&peer->tx_packet_queue)) { + + if (skb_queue_empty(&peer->staged_packet_queue)) { skb = alloc_skb(DATA_PACKET_HEAD_ROOM + MESSAGE_MINIMUM_LENGTH, GFP_ATOMIC); if (unlikely(!skb)) return; skb_reserve(skb, DATA_PACKET_HEAD_ROOM); skb->dev = peer->device->dev; - skb_queue_tail(&peer->tx_packet_queue, skb); + skb_queue_tail(&peer->staged_packet_queue, skb); net_dbg_ratelimited("%s: Sending keepalive packet to peer %Lu (%pISpfsc)\n", peer->device->dev->name, peer->internal_id, &peer->endpoint.addr); } - packet_send_queue(peer); + + packet_send_staged_packets(peer); } -void packet_create_data_done(struct sk_buff_head *queue, struct wireguard_peer *peer) +static void packet_create_data_done(struct sk_buff_head *queue, struct wireguard_peer *peer) { struct sk_buff *skb, *tmp; bool is_keepalive, data_sent = false; @@ -136,65 +182,133 @@ void packet_create_data_done(struct sk_buff_head *queue, struct wireguard_peer * timers_data_sent(peer); keep_key_fresh(peer); +} - if (unlikely(peer->need_resend_queue)) - packet_send_queue(peer); +void packet_tx_worker(struct work_struct *work) +{ + struct crypt_queue *queue = container_of(work, struct crypt_queue, work); + struct crypt_ctx *ctx; + + while ((ctx = queue_first_per_peer(queue)) != NULL && atomic_read(&ctx->is_finished)) { + queue_dequeue(queue); + packet_create_data_done(&ctx->packets, ctx->peer); + peer_put(ctx->peer); + kmem_cache_free(crypt_ctx_cache, ctx); + } } -void packet_send_queue(struct wireguard_peer *peer) +void packet_encrypt_worker(struct work_struct *work) { - struct sk_buff_head queue; - struct sk_buff *skb; + struct crypt_ctx *ctx; + struct crypt_queue *queue = container_of(work, struct multicore_worker, work)->ptr; + struct sk_buff *skb, *tmp; + struct wireguard_peer *peer; + bool have_simd = chacha20poly1305_init_simd(); + + while ((ctx = queue_dequeue_per_device(queue)) != NULL) { + skb_queue_walk_safe(&ctx->packets, skb, tmp) { + if (likely(skb_encrypt(skb, ctx->keypair, have_simd))) { + skb_reset(skb); + } else { + __skb_unlink(skb, &ctx->packets); + dev_kfree_skb(skb); + } + } + /* Dereferencing ctx is unsafe once ctx->is_finished == true, so + * we grab an additional reference to peer. */ + peer = peer_rcu_get(ctx->peer); + atomic_set(&ctx->is_finished, true); + queue_work_on(choose_cpu(&peer->serial_work_cpu, peer->internal_id), peer->device->packet_crypt_wq, &peer->tx_queue.work); + peer_put(peer); + } + chacha20poly1305_deinit_simd(have_simd); +} - peer->need_resend_queue = false; +static void packet_create_data(struct wireguard_peer *peer, struct sk_buff_head *packets, struct noise_keypair *keypair) +{ + struct crypt_ctx *ctx; + struct wireguard_device *wg = peer->device; - /* Steal the current queue into our local one. */ - skb_queue_head_init(&queue); - spin_lock_bh(&peer->tx_packet_queue.lock); - skb_queue_splice_init(&peer->tx_packet_queue, &queue); - spin_unlock_bh(&peer->tx_packet_queue.lock); + ctx = kmem_cache_zalloc(crypt_ctx_cache, GFP_ATOMIC); + if (unlikely(!ctx)) { + skb_queue_purge(packets); + goto err_drop_refs; + } + /* This function consumes the passed references to peer and keypair. */ + ctx->keypair = keypair; + ctx->peer = peer; + __skb_queue_head_init(&ctx->packets); + skb_queue_splice_tail(packets, &ctx->packets); + if (likely(queue_enqueue_per_device_and_peer(&wg->encrypt_queue, &peer->tx_queue, ctx, wg->packet_crypt_wq, &wg->encrypt_queue.last_cpu))) + return; /* Successful. No need to fall through to drop references below. */ + + skb_queue_purge(&ctx->packets); + kmem_cache_free(crypt_ctx_cache, ctx); + +err_drop_refs: + noise_keypair_put(keypair); + peer_put(peer); +} - if (unlikely(skb_queue_empty(&queue))) +void packet_send_staged_packets(struct wireguard_peer *peer) +{ + struct noise_keypair *keypair; + struct noise_symmetric_key *key; + struct sk_buff_head packets; + struct sk_buff *skb; + + /* Steal the current queue into our local one. */ + __skb_queue_head_init(&packets); + spin_lock_bh(&peer->staged_packet_queue.lock); + skb_queue_splice_init(&peer->staged_packet_queue, &packets); + spin_unlock_bh(&peer->staged_packet_queue.lock); + if (unlikely(skb_queue_empty(&packets))) return; - /* We submit it for encryption and sending. */ - switch (packet_create_data(&queue, peer)) { - case 0: - break; - case -EBUSY: - /* EBUSY happens when the parallel workers are all filled up, in which - * case we should requeue everything. */ - - /* First, we mark that we should try to do this later, when existing - * jobs are done. */ - peer->need_resend_queue = true; - - /* We stick the remaining skbs from local_queue at the top of the peer's - * queue again, setting the top of local_queue to be the skb that begins - * the requeueing. */ - spin_lock_bh(&peer->tx_packet_queue.lock); - skb_queue_splice(&queue, &peer->tx_packet_queue); - spin_unlock_bh(&peer->tx_packet_queue.lock); - break; - case -ENOKEY: - /* ENOKEY means that we don't have a valid session for the peer, which - * means we should initiate a session, but after requeuing like above. - * Since we'll be queuing these up for potentially a little while, we - * first make sure they're no longer using up a socket's write buffer. */ - - skb_queue_walk (&queue, skb) - skb_orphan(skb); - - spin_lock_bh(&peer->tx_packet_queue.lock); - skb_queue_splice(&queue, &peer->tx_packet_queue); - spin_unlock_bh(&peer->tx_packet_queue.lock); - - packet_queue_handshake_initiation(peer, false); - break; - default: - /* If we failed for any other reason, we want to just free the packets and - * forget about them. We do this unlocked, since we're the only ones with - * a reference to the local queue. */ - __skb_queue_purge(&queue); + /* First we make sure we have a valid reference to a valid key. */ + rcu_read_lock_bh(); + keypair = noise_keypair_get(rcu_dereference_bh(peer->keypairs.current_keypair)); + rcu_read_unlock_bh(); + if (unlikely(!keypair)) + goto out_nokey; + key = &keypair->sending; + if (unlikely(!key || !key->is_valid)) + goto out_nokey; + if (unlikely(time_is_before_eq_jiffies64(key->birthdate + REJECT_AFTER_TIME))) + goto out_invalid; + + /* After we know we have a somewhat valid key, we now try to assign nonces to + * all of the packets in the queue. If we can't assign nonces for all of them, + * we just consider it a failure and wait for the next handshake. */ + skb_queue_walk (&packets, skb) { + PACKET_CB(skb)->ds = ip_tunnel_ecn_encap(0 /* No outer TOS: no leak. TODO: should we use flowi->tos as outer? */, ip_hdr(skb), skb); + PACKET_CB(skb)->nonce = atomic64_inc_return(&key->counter.counter) - 1; + if (unlikely(PACKET_CB(skb)->nonce >= REJECT_AFTER_MESSAGES)) + goto out_invalid; } + + /* We pass off our peer and keypair references too the data subsystem and return. */ + packet_create_data(peer_rcu_get(peer), &packets, keypair); + return; + +out_invalid: + key->is_valid = false; +out_nokey: + noise_keypair_put(keypair); + + /* We orphan the packets if we're waiting on a handshake, so that they + * don't block a socket's pool. */ + skb_queue_walk (&packets, skb) + skb_orphan(skb); + /* Then we put them back on the top of the queue. We're not too concerned about + * accidently getting things a little out of order if packets are being added + * really fast, because this queue is for before packets can even be sent and + * it's small anyway. */ + spin_lock_bh(&peer->staged_packet_queue.lock); + skb_queue_splice(&packets, &peer->staged_packet_queue); + spin_unlock_bh(&peer->staged_packet_queue.lock); + + /* If we're exiting because there's something wrong with the key, it means + * we should initiate a new handshake. */ + packet_send_queued_handshake_initiation(peer, false); } |