diff options
-rw-r--r-- | paramiko/transport.py | 188 |
1 files changed, 75 insertions, 113 deletions
diff --git a/paramiko/transport.py b/paramiko/transport.py index 847005d9..f0fcb979 100644 --- a/paramiko/transport.py +++ b/paramiko/transport.py @@ -244,52 +244,44 @@ class Transport(threading.Thread, ClosingContextManager): "class": algorithms.AES, "mode": modes.CTR, "block-size": 16, - "iv-size": 16, "key-size": 16, }, "aes192-ctr": { "class": algorithms.AES, "mode": modes.CTR, "block-size": 16, - "iv-size": 16, "key-size": 24, }, "aes256-ctr": { "class": algorithms.AES, "mode": modes.CTR, "block-size": 16, - "iv-size": 16, "key-size": 32, }, "aes128-cbc": { "class": algorithms.AES, "mode": modes.CBC, "block-size": 16, - "iv-size": 16, "key-size": 16, }, "aes192-cbc": { "class": algorithms.AES, "mode": modes.CBC, "block-size": 16, - "iv-size": 16, "key-size": 24, }, "aes256-cbc": { "class": algorithms.AES, "mode": modes.CBC, "block-size": 16, - "iv-size": 16, "key-size": 32, }, "3des-cbc": { "class": TripleDES, "mode": modes.CBC, "block-size": 8, - "iv-size": 8, "key-size": 24, }, - # aead cipher "aes128-gcm@openssh.com": { "class": aead.AESGCM, "block-size": 16, @@ -2054,23 +2046,26 @@ class Transport(threading.Thread, ClosingContextManager): sofar += digest return out[:nbytes] - def _get_cipher(self, name, key, iv, operation): + def _get_engine(self, name, key, iv=None, operation=None, aead=False): if name not in self._cipher_info: - raise SSHException("Unknown client cipher " + name) + raise SSHException("Unknown cipher " + name) + info = self._cipher_info[name] + algorithm = info["class"](key) + # AEAD types (eg GCM) use their algorithm class /as/ the encryption + # engine (they expose the same encrypt/decrypt API as a CipherContext) + if aead: + return algorithm + # All others go through the Cipher class. + cipher = Cipher( + algorithm=algorithm, + # TODO: why is this getting tickled in aesgcm mode??? + mode=info["mode"](iv), + backend=default_backend(), + ) + if operation is self._ENCRYPT: + return cipher.encryptor() else: - cipher = Cipher( - self._cipher_info[name]["class"](key), - self._cipher_info[name]["mode"](iv), - backend=default_backend(), - ) - if operation is self._ENCRYPT: - return cipher.encryptor() - else: - return cipher.decryptor() - - def _get_aead_cipher(self, name, key): - aead_cipher = self._cipher_info[name]["class"](key) - return aead_cipher + return cipher.decryptor() def _set_forward_agent_handler(self, handler): if handler is None: @@ -2738,35 +2733,27 @@ class Transport(threading.Thread, ClosingContextManager): def _activate_inbound(self): """switch on newly negotiated encryption parameters for inbound traffic""" - block_size = self._cipher_info[self.remote_cipher]["block-size"] + info = self._cipher_info[self.remote_cipher] + aead = info.get("is_aead", False) + block_size = info["block-size"] + key_size = info["key-size"] + # Non-AEAD/GCM type ciphers' IV size is their block size. + iv_size = info.get("iv-size", block_size) if self.server_mode: - IV_in = self._compute_key( - "A", self._cipher_info[self.remote_cipher]["iv-size"] - ) - key_in = self._compute_key( - "C", self._cipher_info[self.remote_cipher]["key-size"] - ) + iv_in = self._compute_key("A", iv_size) + key_in = self._compute_key("C", key_size) else: - IV_in = self._compute_key( - "B", self._cipher_info[self.remote_cipher]["iv-size"] - ) - key_in = self._compute_key( - "D", self._cipher_info[self.remote_cipher]["key-size"] - ) - - is_aead = ( - True - if self._cipher_info[self.remote_cipher].get("is_aead") - else False + iv_in = self._compute_key("B", iv_size) + key_in = self._compute_key("D", key_size) + + engine = self._get_engine( + name=self.remote_cipher, + key=key_in, + iv=iv_in, + operation=self._DECRYPT, + aead=aead, ) - - if is_aead: - engine = self._get_aead_cipher(self.remote_cipher, key_in) - else: - engine = self._get_cipher( - self.remote_cipher, key_in, IV_in, self._DECRYPT - ) - etm = "etm@openssh.com" in self.remote_mac + etm = (not aead) and "etm@openssh.com" in self.remote_mac mac_size = self._mac_info[self.remote_mac]["size"] mac_engine = self._mac_info[self.remote_mac]["class"] # initial mac keys are done in the hash's natural size (not the @@ -2776,22 +2763,16 @@ class Transport(threading.Thread, ClosingContextManager): else: mac_key = self._compute_key("F", mac_engine().digest_size) - if is_aead: - self._log(DEBUG, "use aead-cipher, so set mac to None") - self.packetizer.set_inbound_cipher( - engine, - block_size, - None, - 16, - bytes(), - etm=False, - aead=is_aead, - iv_in=IV_in, - ) - else: - self.packetizer.set_inbound_cipher( - engine, block_size, mac_engine, mac_size, mac_key, etm=etm - ) + self.packetizer.set_inbound_cipher( + block_engine=engine, + block_size=block_size, + mac_engine=None if aead else mac_engine, + mac_size=16 if aead else mac_size, + mac_key=None if aead else mac_key, + etm=etm, + aead=aead, + iv_in=iv_in if aead else None, + ) compress_in = self._compression_info[self.remote_compression][1] if compress_in is not None and ( @@ -2820,35 +2801,27 @@ class Transport(threading.Thread, ClosingContextManager): "Resetting outbound seqno after NEWKEYS due to strict mode", ) self.packetizer.reset_seqno_out() - block_size = self._cipher_info[self.local_cipher]["block-size"] + info = self._cipher_info[self.local_cipher] + aead = info.get("is_aead", False) + block_size = info["block-size"] + key_size = info["key-size"] + # Non-AEAD/GCM type ciphers' IV size is their block size. + iv_size = info.get("iv-size", block_size) if self.server_mode: - IV_out = self._compute_key( - "B", self._cipher_info[self.local_cipher]["iv-size"] - ) - key_out = self._compute_key( - "D", self._cipher_info[self.local_cipher]["key-size"] - ) + iv_out = self._compute_key("B", iv_size) + key_out = self._compute_key("D", key_size) else: - IV_out = self._compute_key( - "A", self._cipher_info[self.local_cipher]["iv-size"] - ) - key_out = self._compute_key( - "C", self._cipher_info[self.local_cipher]["key-size"] - ) - - is_aead = ( - True - if self._cipher_info[self.local_cipher].get("is_aead") - else False + iv_out = self._compute_key("A", iv_size) + key_out = self._compute_key("C", key_size) + + engine = self._get_engine( + name=self.local_cipher, + key=key_out, + iv=iv_out, + operation=self._ENCRYPT, + aead=aead, ) - - if is_aead: - engine = self._get_aead_cipher(self.local_cipher, key_out) - else: - engine = self._get_cipher( - self.local_cipher, key_out, IV_out, self._ENCRYPT - ) - etm = "etm@openssh.com" in self.local_mac + etm = (not aead) and "etm@openssh.com" in self.local_mac mac_size = self._mac_info[self.local_mac]["size"] mac_engine = self._mac_info[self.local_mac]["class"] # initial mac keys are done in the hash's natural size (not the @@ -2859,28 +2832,17 @@ class Transport(threading.Thread, ClosingContextManager): mac_key = self._compute_key("E", mac_engine().digest_size) sdctr = self.local_cipher.endswith("-ctr") - if is_aead: - self.packetizer.set_outbound_cipher( - engine, - block_size, - None, - 16, - bytes(), - sdctr, - etm=False, - aead=is_aead, - iv_out=IV_out, - ) - else: - self.packetizer.set_outbound_cipher( - engine, - block_size, - mac_engine, - mac_size, - mac_key, - sdctr, - etm=etm, - ) + self.packetizer.set_outbound_cipher( + block_engine=engine, + block_size=block_size, + mac_engine=None if aead else mac_engine, + mac_size=16 if aead else mac_size, + mac_key=None if aead else mac_key, + sdctr=sdctr, + etm=etm, + aead=aead, + iv_out=iv_out if aead else None, + ) compress_out = self._compression_info[self.local_compression][0] if compress_out is not None and ( |