Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions lib/jnpr/junos/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,14 @@ def __init__(self, *vargs, **kvargs):
*OPTIONAL* To disable public key authentication.
default is ``None``.

:param bool allow_agent:
*OPTIONAL* Specifies whether to use keys provided by an SSH agent for authentication.
If set to ``True``, the SSH connection will use any keys loaded in the agent.
If set to ``False``, keys from the SSH agent will not be used.
If set to ``None``, the default behavior is applied: agent keys are used only if
both password and private key file are not provided.
Default is ``None``.

:param str bind_addr:
*OPTIONAL* To use (local) source IP address.
default is ``None``.
Expand All @@ -1243,6 +1251,7 @@ def __init__(self, *vargs, **kvargs):
self._huge_tree = kvargs.get("huge_tree", False)
self._conn_open_timeout = kvargs.get("conn_open_timeout", 30)
self._look_for_keys = kvargs.get("look_for_keys", None)
self._allow_agent = kvargs.get("allow_agent", None)
self._bind_addr = kvargs.get("bind_addr", None)
self._hostkey_verify = kvargs.get("hostkey_verify", False)
if self._fact_style != "new":
Expand Down Expand Up @@ -1367,9 +1376,18 @@ def open(self, *vargs, **kvargs):
# in this condition it means we want to query the agent
# for available ssh keys

allow_agent = bool(
(self._auth_password is None) and (self._ssh_private_key_file is None)
)
if self._allow_agent is True:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please modify the code

if self._allow_agent is not None:
allow_agent = self._allow_agent
else:
# Default: only allow agent if no password and no private key are provided
allow_agent = self._auth_password is None and self._ssh_private_key_file is None

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I will check

# If set to True by the user, override with value as per user.
allow_agent = self._allow_agent
elif self._allow_agent is False:
# If set to False by the user, override with value as per user.
allow_agent = self._allow_agent
else:
# Default behaviour if allow_agent is None
allow_agent = bool(
(self._auth_password is None)
and (self._ssh_private_key_file is None)
)

# option to disable ncclient transport ssh authentication
# using public keys look_for_keys=False
Expand Down
108 changes: 102 additions & 6 deletions tests/functional/test_device_ssh.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,116 @@
__author__ = "rsherman, vnitinv"

import unittest

from jnpr.junos import Device

try:
import unittest2 as unittest
except ImportError:
import unittest

class TestDeviceSsh(unittest.TestCase):
def tearDown(self):
self.dev.close()

def test_device_open_key_pass(self):
self.dev = Device(
host="xxxx",
user="jenkins",
ssh_private_key_file="/var/lib/jenkins/.ssh/passkey",
passwd="password",
host="x.x.x.x",
user="netops",
ssh_private_key_file="~/.ssh/id_rsa",
passwd="net123",
)
self.dev.open()
self.assertEqual(self.dev.connected, True)

def test_device_open_password(self):
self.dev = Device(
host="x.x.x.x",
user="netops",
passwd="net123",
)
self.dev.open()
self.assertEqual(self.dev.connected, True)

def test_device_open_ssh_agent_true(self):
self.dev = Device(
host="x.x.x.x",
user="netops",
allow_agent=True
)
self.dev.open()
self.assertEqual(self.dev.connected, True)

def test_device_open_ssh_agent_false(self):
self.dev = Device(
host="x.x.x.x",
user="netops",
allow_agent=False,
)
self.dev.open()
self.assertEqual(self.dev.connected, True)

def test_device_open_key_file(self):
self.dev = Device(
host="x.x.x.x",
user="netops",
ssh_private_key_file="~/.ssh/id_rsa",
)
self.dev.open()
self.assertEqual(self.dev.connected, True)

def test_device_open_key_file(self):
self.dev = Device(
host="x.x.x.x",
user="netops",
ssh_private_key_file="~/.ssh/id_rsa",
)
self.dev.open()
self.assertEqual(self.dev.connected, True)

def test_device_open_proxy(self):
self.dev = Device(
host="x.x.x.x",
user="netops",
proxy_command="ssh -J [email protected]"
)
self.dev.open()
self.assertEqual(self.dev.connected, True)

def test_device_open_ssh_agent_proxy(self):
self.dev = Device(
host="x.x.x.x",
user="netops",
proxy_command="ssh -J [email protected]",
allow_agent=True,
)
self.dev.open()
self.assertEqual(self.dev.connected, True)

def test_device_open_key_file_proxy(self):
self.dev = Device(
host="x.x.x.x",
user="netops",
proxy_command="ssh -J [email protected]",
ssh_private_key_file="~/.ssh/id_rsa",
)
self.dev.open()
self.assertEqual(self.dev.connected, True)

def test_device_open_ssh_agent_proxy(self):
self.dev = Device(
host="x.x.x.x",
user="netops",
proxy_command="ssh -J [email protected]",
allow_agent=True,
)
self.dev.open()
self.assertEqual(self.dev.connected, True)

def test_device_open_key_file_proxy(self):
self.dev = Device(
host="x.x.x.x",
user="netops",
proxy_command="ssh -J [email protected]",
ssh_private_key_file="~/.ssh/id_rsa",
)
self.dev.open()
self.assertEqual(self.dev.connected, True)