1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
|
from collections import namedtuple
from .agent import AgentKey
from .util import get_logger
from .ssh_exception import AuthenticationException
class AuthSource:
"""
Some SSH authentication source, such as a password, private key, or agent.
See subclasses in this module for concrete implementations.
All implementations must accept at least a ``username`` (``str``) kwarg.
"""
def __init__(self, username):
self.username = username
def _repr(self, **kwargs):
# TODO: are there any good libs for this? maybe some helper from
# structlog?
pairs = [f"{k}={v!r}" for k, v in kwargs.items()]
joined = ", ".join(pairs)
return f"{self.__class__.__name__}({joined})"
def __repr__(self):
return self._repr()
def authenticate(self, transport):
"""
Perform authentication.
"""
raise NotImplementedError
def NoneAuth(AuthSource):
"""
Auth type "none", ie https://www.rfc-editor.org/rfc/rfc4252#section-5.2 .
"""
def authenticate(self, transport):
return transport.auth_none(self.username)
class Password(AuthSource):
"""
Password authentication.
:param callable password_getter:
A lazy callable that should return a `str` password value at
authentication time, such as a `functools.partial` wrapping
`getpass.getpass`, an API call to a secrets store, or similar.
If you already know the password at instantiation time, you should
simply use something like ``lambda: "my literal"`` (for a literal, but
also, shame on you!) or ``lambda: variable_name (for something stored
in a variable).
"""
def __init__(self, username, password_getter):
super().__init__(username=username)
self.password_getter = password_getter
def __repr__(self):
# Password auth is marginally more 'username-caring' than pkeys, so may
# as well log that info here.
return super()._repr(user=self.username)
def authenticate(self, transport):
# Lazily get the password, in case it's prompting a user
# TODO: be nice to log source _of_ the password?
password = self.password_getter()
return transport.auth_password(self.username, password)
class PrivateKey(AuthSource):
"""
Essentially a mixin for private keys.
Knows how to auth, but leaves key material discovery/loading/decryption to
subclasses.
Subclasses **must** ensure that they've set ``self.pkey`` to a decrypted
`.PKey` instance before calling ``super().authenticate``; typically
either in their ``__init__``, or in an overridden ``authenticate`` prior to
its `super` call.
"""
def authenticate(self, transport):
return transport.auth_publickey(self.username, self.pkey)
class InMemoryPrivateKey(PrivateKey):
"""
An in-memory, decrypted `.PKey` object.
"""
def __init__(self, username, pkey):
super().__init__(username=username)
# No decryption (presumably) necessary!
self.pkey = pkey
def __repr__(self):
# NOTE: most of interesting repr-bits for private keys is in PKey.
# TODO: tacking on agent-ness like this is a bit awkward, but, eh?
rep = super()._repr(pkey=self.pkey)
if isinstance(self.pkey, AgentKey):
rep += " [agent]"
return rep
class OnDiskPrivateKey(PrivateKey):
"""
Some on-disk private key that needs opening and possibly decrypting.
:param str source:
String tracking where this key's path was specified; should be one of
``"ssh-config"``, ``"python-config"``, or ``"implicit-home"``.
:param Path path:
The filesystem path this key was loaded from.
:param PKey pkey:
The `PKey` object this auth source uses/represents.
"""
# TODO: how to log/note how this path came to our attention (ssh_config,
# fabric config, some direct kwarg somewhere, CLI flag, etc)? Different
# subclasses for all of those seems like massive overkill, so just some
# sort of "via" or "source" string argument?
def __init__(self, username, source, path, pkey):
super().__init__(username=username)
self.source = source
self.path = path
# Superclass wants .pkey, other two are mostly for display/debugging.
self.pkey = pkey
def __repr__(self):
return self._repr(
key=self.pkey, source=self.source, path=str(self.path)
)
# TODO re sources: is there anything in an OpenSSH config file that doesn't fit
# into what Paramiko already had kwargs for?
SourceResult = namedtuple("SourceResult", ["source", "result"])
# TODO: tempting to make this an OrderedDict, except the keys essentially want
# to be rich objects (AuthSources) which do not make for useful user indexing?
# TODO: members being vanilla tuples is pretty old-school/expedient; they
# "really" want to be something that's type friendlier (unless the tuple's 2nd
# member being a Union of two types is "fine"?), which I assume means yet more
# classes, eg an abstract SourceResult with concrete AuthSuccess and
# AuthFailure children?
# TODO: arguably we want __init__ typechecking of the members (or to leverage
# mypy by classifying this literally as list-of-AuthSource?)
class AuthResult(list):
"""
Represents a partial or complete SSH authentication attempt.
This class conceptually extends `AuthStrategy` by pairing the former's
authentication **sources** with the **results** of trying to authenticate
with them.
`AuthResult` is a (subclass of) `list` of `namedtuple`, which are of the
form ``namedtuple('SourceResult', 'source', 'result')`` (where the
``source`` member is an `AuthSource` and the ``result`` member is either a
return value from the relevant `.Transport` method, or an exception
object).
Instances also have a `strategy` attribute referencing the `AuthStrategy`
which was attempted.
"""
def __init__(self, strategy, *args, **kwargs):
self.strategy = strategy
super().__init__(*args, **kwargs)
def __str__(self):
# NOTE: meaningfully distinct from __repr__, which still wants to use
# superclass' implementation.
# TODO: go hog wild, use rich.Table? how is that on degraded term's?
return "\n".join(f"{x.source} -> {x.result}" for x in self)
# TODO 4.0: descend from SSHException or even just Exception
class AuthFailure(AuthenticationException):
"""
Basic exception wrapping an `AuthResult` indicating overall auth failure.
Note that `AuthFailure` descends from `AuthenticationException` but is
generally "higher level"; the latter is now only raised by individual
`AuthSource` attempts and should typically only be seen by users when
encapsulated in this class. It subclasses `AuthenticationException`
primarily for backwards compatibility reasons.
"""
def __init__(self, result):
self.result = result
def __str__(self):
return "\n" + str(self.result)
class AuthStrategy:
"""
This class represents one or more attempts to auth with an SSH server.
By default, subclasses must at least accept an ``ssh_config``
(`.SSHConfig`) keyword argument, but may opt to accept more as needed for
their particular strategy.
"""
def __init__(
self,
ssh_config,
):
self.ssh_config = ssh_config
self.log = get_logger(__name__)
def get_sources(self):
"""
Generator yielding `AuthSource` instances, in the order to try.
This is the primary override point for subclasses: you figure out what
sources you need, and ``yield`` them.
Subclasses _of_ subclasses may find themselves wanting to do things
like filtering or discarding around a call to `super`.
"""
raise NotImplementedError
def authenticate(self, transport):
"""
Handles attempting `AuthSource` instances yielded from `get_sources`.
You *normally* won't need to override this, but it's an option for
advanced users.
"""
succeeded = False
overall_result = AuthResult(strategy=self)
# TODO: arguably we could fit in a "send none auth, record allowed auth
# types sent back" thing here as OpenSSH-client does, but that likely
# wants to live in fabric.OpenSSHAuthStrategy as not all target servers
# will implement it!
# TODO: needs better "server told us too many attempts" checking!
for source in self.get_sources(transport):
self.log.debug(f"Trying {source}")
try: # NOTE: this really wants to _only_ wrap the authenticate()!
result = source.authenticate(transport)
succeeded = True
# TODO: 'except PartialAuthentication' is needed for 2FA and
# similar, as per old SSHClient.connect - it is the only way
# AuthHandler supplies access to the 'name-list' field from
# MSG_USERAUTH_FAILURE, at present.
except Exception as e:
result = e
# TODO: look at what this could possibly raise, we don't really
# want Exception here, right? just SSHException subclasses? or
# do we truly want to capture anything at all with assumption
# it's easy enough for users to look afterwards?
# NOTE: showing type, not message, for tersity & also most of
# the time it's basically just "Authentication failed."
source_class = e.__class__.__name__
self.log.info(
f"Authentication via {source} failed with {source_class}"
)
overall_result.append(SourceResult(source, result))
if succeeded:
break
# Gotta die here if nothing worked, otherwise Transport's main loop
# just kinda hangs out until something times out!
if not succeeded:
raise AuthFailure(result=overall_result)
# Success: give back what was done, in case they care.
return overall_result
# TODO: is there anything OpenSSH client does which _can't_ cleanly map to
# iterating a generator?
|