summaryrefslogtreecommitdiffhomepage
path: root/paramiko/transport.py
diff options
context:
space:
mode:
Diffstat (limited to 'paramiko/transport.py')
-rw-r--r--paramiko/transport.py188
1 files changed, 75 insertions, 113 deletions
diff --git a/paramiko/transport.py b/paramiko/transport.py
index 847005d9..f0fcb979 100644
--- a/paramiko/transport.py
+++ b/paramiko/transport.py
@@ -244,52 +244,44 @@ class Transport(threading.Thread, ClosingContextManager):
"class": algorithms.AES,
"mode": modes.CTR,
"block-size": 16,
- "iv-size": 16,
"key-size": 16,
},
"aes192-ctr": {
"class": algorithms.AES,
"mode": modes.CTR,
"block-size": 16,
- "iv-size": 16,
"key-size": 24,
},
"aes256-ctr": {
"class": algorithms.AES,
"mode": modes.CTR,
"block-size": 16,
- "iv-size": 16,
"key-size": 32,
},
"aes128-cbc": {
"class": algorithms.AES,
"mode": modes.CBC,
"block-size": 16,
- "iv-size": 16,
"key-size": 16,
},
"aes192-cbc": {
"class": algorithms.AES,
"mode": modes.CBC,
"block-size": 16,
- "iv-size": 16,
"key-size": 24,
},
"aes256-cbc": {
"class": algorithms.AES,
"mode": modes.CBC,
"block-size": 16,
- "iv-size": 16,
"key-size": 32,
},
"3des-cbc": {
"class": TripleDES,
"mode": modes.CBC,
"block-size": 8,
- "iv-size": 8,
"key-size": 24,
},
- # aead cipher
"aes128-gcm@openssh.com": {
"class": aead.AESGCM,
"block-size": 16,
@@ -2054,23 +2046,26 @@ class Transport(threading.Thread, ClosingContextManager):
sofar += digest
return out[:nbytes]
- def _get_cipher(self, name, key, iv, operation):
+ def _get_engine(self, name, key, iv=None, operation=None, aead=False):
if name not in self._cipher_info:
- raise SSHException("Unknown client cipher " + name)
+ raise SSHException("Unknown cipher " + name)
+ info = self._cipher_info[name]
+ algorithm = info["class"](key)
+ # AEAD types (eg GCM) use their algorithm class /as/ the encryption
+ # engine (they expose the same encrypt/decrypt API as a CipherContext)
+ if aead:
+ return algorithm
+ # All others go through the Cipher class.
+ cipher = Cipher(
+ algorithm=algorithm,
+ # TODO: why is this getting tickled in aesgcm mode???
+ mode=info["mode"](iv),
+ backend=default_backend(),
+ )
+ if operation is self._ENCRYPT:
+ return cipher.encryptor()
else:
- cipher = Cipher(
- self._cipher_info[name]["class"](key),
- self._cipher_info[name]["mode"](iv),
- backend=default_backend(),
- )
- if operation is self._ENCRYPT:
- return cipher.encryptor()
- else:
- return cipher.decryptor()
-
- def _get_aead_cipher(self, name, key):
- aead_cipher = self._cipher_info[name]["class"](key)
- return aead_cipher
+ return cipher.decryptor()
def _set_forward_agent_handler(self, handler):
if handler is None:
@@ -2738,35 +2733,27 @@ class Transport(threading.Thread, ClosingContextManager):
def _activate_inbound(self):
"""switch on newly negotiated encryption parameters for
inbound traffic"""
- block_size = self._cipher_info[self.remote_cipher]["block-size"]
+ info = self._cipher_info[self.remote_cipher]
+ aead = info.get("is_aead", False)
+ block_size = info["block-size"]
+ key_size = info["key-size"]
+ # Non-AEAD/GCM type ciphers' IV size is their block size.
+ iv_size = info.get("iv-size", block_size)
if self.server_mode:
- IV_in = self._compute_key(
- "A", self._cipher_info[self.remote_cipher]["iv-size"]
- )
- key_in = self._compute_key(
- "C", self._cipher_info[self.remote_cipher]["key-size"]
- )
+ iv_in = self._compute_key("A", iv_size)
+ key_in = self._compute_key("C", key_size)
else:
- IV_in = self._compute_key(
- "B", self._cipher_info[self.remote_cipher]["iv-size"]
- )
- key_in = self._compute_key(
- "D", self._cipher_info[self.remote_cipher]["key-size"]
- )
-
- is_aead = (
- True
- if self._cipher_info[self.remote_cipher].get("is_aead")
- else False
+ iv_in = self._compute_key("B", iv_size)
+ key_in = self._compute_key("D", key_size)
+
+ engine = self._get_engine(
+ name=self.remote_cipher,
+ key=key_in,
+ iv=iv_in,
+ operation=self._DECRYPT,
+ aead=aead,
)
-
- if is_aead:
- engine = self._get_aead_cipher(self.remote_cipher, key_in)
- else:
- engine = self._get_cipher(
- self.remote_cipher, key_in, IV_in, self._DECRYPT
- )
- etm = "etm@openssh.com" in self.remote_mac
+ etm = (not aead) and "etm@openssh.com" in self.remote_mac
mac_size = self._mac_info[self.remote_mac]["size"]
mac_engine = self._mac_info[self.remote_mac]["class"]
# initial mac keys are done in the hash's natural size (not the
@@ -2776,22 +2763,16 @@ class Transport(threading.Thread, ClosingContextManager):
else:
mac_key = self._compute_key("F", mac_engine().digest_size)
- if is_aead:
- self._log(DEBUG, "use aead-cipher, so set mac to None")
- self.packetizer.set_inbound_cipher(
- engine,
- block_size,
- None,
- 16,
- bytes(),
- etm=False,
- aead=is_aead,
- iv_in=IV_in,
- )
- else:
- self.packetizer.set_inbound_cipher(
- engine, block_size, mac_engine, mac_size, mac_key, etm=etm
- )
+ self.packetizer.set_inbound_cipher(
+ block_engine=engine,
+ block_size=block_size,
+ mac_engine=None if aead else mac_engine,
+ mac_size=16 if aead else mac_size,
+ mac_key=None if aead else mac_key,
+ etm=etm,
+ aead=aead,
+ iv_in=iv_in if aead else None,
+ )
compress_in = self._compression_info[self.remote_compression][1]
if compress_in is not None and (
@@ -2820,35 +2801,27 @@ class Transport(threading.Thread, ClosingContextManager):
"Resetting outbound seqno after NEWKEYS due to strict mode",
)
self.packetizer.reset_seqno_out()
- block_size = self._cipher_info[self.local_cipher]["block-size"]
+ info = self._cipher_info[self.local_cipher]
+ aead = info.get("is_aead", False)
+ block_size = info["block-size"]
+ key_size = info["key-size"]
+ # Non-AEAD/GCM type ciphers' IV size is their block size.
+ iv_size = info.get("iv-size", block_size)
if self.server_mode:
- IV_out = self._compute_key(
- "B", self._cipher_info[self.local_cipher]["iv-size"]
- )
- key_out = self._compute_key(
- "D", self._cipher_info[self.local_cipher]["key-size"]
- )
+ iv_out = self._compute_key("B", iv_size)
+ key_out = self._compute_key("D", key_size)
else:
- IV_out = self._compute_key(
- "A", self._cipher_info[self.local_cipher]["iv-size"]
- )
- key_out = self._compute_key(
- "C", self._cipher_info[self.local_cipher]["key-size"]
- )
-
- is_aead = (
- True
- if self._cipher_info[self.local_cipher].get("is_aead")
- else False
+ iv_out = self._compute_key("A", iv_size)
+ key_out = self._compute_key("C", key_size)
+
+ engine = self._get_engine(
+ name=self.local_cipher,
+ key=key_out,
+ iv=iv_out,
+ operation=self._ENCRYPT,
+ aead=aead,
)
-
- if is_aead:
- engine = self._get_aead_cipher(self.local_cipher, key_out)
- else:
- engine = self._get_cipher(
- self.local_cipher, key_out, IV_out, self._ENCRYPT
- )
- etm = "etm@openssh.com" in self.local_mac
+ etm = (not aead) and "etm@openssh.com" in self.local_mac
mac_size = self._mac_info[self.local_mac]["size"]
mac_engine = self._mac_info[self.local_mac]["class"]
# initial mac keys are done in the hash's natural size (not the
@@ -2859,28 +2832,17 @@ class Transport(threading.Thread, ClosingContextManager):
mac_key = self._compute_key("E", mac_engine().digest_size)
sdctr = self.local_cipher.endswith("-ctr")
- if is_aead:
- self.packetizer.set_outbound_cipher(
- engine,
- block_size,
- None,
- 16,
- bytes(),
- sdctr,
- etm=False,
- aead=is_aead,
- iv_out=IV_out,
- )
- else:
- self.packetizer.set_outbound_cipher(
- engine,
- block_size,
- mac_engine,
- mac_size,
- mac_key,
- sdctr,
- etm=etm,
- )
+ self.packetizer.set_outbound_cipher(
+ block_engine=engine,
+ block_size=block_size,
+ mac_engine=None if aead else mac_engine,
+ mac_size=16 if aead else mac_size,
+ mac_key=None if aead else mac_key,
+ sdctr=sdctr,
+ etm=etm,
+ aead=aead,
+ iv_out=iv_out if aead else None,
+ )
compress_out = self._compression_info[self.local_compression][0]
if compress_out is not None and (