Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/345.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added the ability to download files from within a Cisco IOS device.
27 changes: 27 additions & 0 deletions docs/user/lib_getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,33 @@ interface GigabitEthernet1
>>>
```

#### File Copy with URL

All devices that support file copy also support copying files directly from a URL to the device. This is useful for larger files like OS images. To do this, you need to use the `FilePullSpec` data model to specify the source file information and then pass that to the `file_copy` method. Currently only supported on Cisco IOS devices. Tested with ftp, http, https, sftp, and tftp urls.

```python
from pyntc.utils.models import FilePullSpec

>>> source_file = FilePullSpec(
... download_url='http://example.com/newconfig.cfg',
... checksum='abc123def456',
... hashing_algorithm='md5',
... file_name='newconfig.cfg'
... )
>>> for device in devices:
... device.file_copy(source_file)
...
>>>
```

We recommend setting up the ip client on the device to be able to use this feature. For instance, on a Cisco IOS device you would need to set the source interface for the ip client when using http or https urls. You can do this with the `config` method:

```python
>>> csr1.config('ip http client source-interface GigabitEthernet1')
>>>
```


### Save Configs

- `save` method
Expand Down
4 changes: 2 additions & 2 deletions pyntc/devices/base_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def file_copy(self, src, dest=None, **kwargs):

Keyword Args:
file_system (str): Supported only for IOS and NXOS. The file system for the
remote fle. If no file_system is provided, then the ``get_file_system``
remote file. If no file_system is provided, then the ``get_file_system``
method is used to determine the correct file system to use.
"""
raise NotImplementedError
Expand All @@ -241,7 +241,7 @@ def file_copy_remote_exists(self, src, dest=None, **kwargs):

Keyword Args:
file_system (str): Supported only for IOS and NXOS. The file system for the
remote fle. If no file_system is provided, then the ``get_file_system``
remote file. If no file_system is provided, then the ``get_file_system``
method is used to determine the correct file system to use.

Returns:
Expand Down
159 changes: 148 additions & 11 deletions pyntc/devices/ios_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@
import os
import re
import time
from typing import TYPE_CHECKING, Any, Callable, Optional, Union

from netmiko import ConnectHandler, FileTransfer
from netmiko import ConnectHandler
from netmiko.cisco import CiscoIosFileTransfer
from netmiko.exceptions import ReadTimeout

from pyntc import log
from pyntc.devices.base_device import BaseDevice, RollbackError, fix_docs
from pyntc.errors import (
if TYPE_CHECKING:
from netmiko.base_connection import BaseConnection

from pyntc import log # pylint: disable=wrong-import-position

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make pylint and isort happy if you moved the conditional import above down below all of the other imports?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds like it is going to be the solution I will test that.

from pyntc.devices.base_device import BaseDevice, RollbackError, fix_docs # pylint: disable=wrong-import-position
from pyntc.errors import ( # pylint: disable=wrong-import-position
CommandError,
CommandListError,
DeviceNotActiveError,
Expand All @@ -21,7 +26,8 @@
RebootTimeoutError,
SocketClosedError,
)
from pyntc.utils import get_structured_data
from pyntc.utils import get_structured_data # pylint: disable=wrong-import-position
from pyntc.utils.models import FilePullSpec # pylint: disable=wrong-import-position

BASIC_FACTS_KM = {"model": "hardware", "os_version": "version", "serial_number": "serial", "hostname": "hostname"}
RE_SHOW_REDUNDANCY = re.compile(
Expand All @@ -36,6 +42,129 @@
INSTALL_MODE_FILE_NAME = "packages.conf"


class FileTransferURLPull(CiscoIosFileTransfer):
"""Custom FileTransfer class to optionally allow downloading files from a url."""

# Netmiko's FileTransfer class requires a source_file to exist.
# When doing a url pull, the source_file is the FilePullSpec, which doesn't exist on the filesystem,
# so we have to override the __init__ method to get around this, and not call super().__init__(), since it assumes the file exists.
def __init__( # pylint: disable=super-init-not-called, too-many-positional-arguments
self,
ssh_conn: "BaseConnection",
source_file: Union[str, "FilePullSpec"],
dest_file: str,
file_system: Optional[str] = None,
direction: str = "put",
socket_timeout: float = 10.0,
progress: Optional[Callable[..., Any]] = None,
progress4: Optional[Callable[..., Any]] = None,
hash_supported: bool = True,
) -> None:
"""Uses Netmiko's FileTransfer class to transfer files to and from the device, but also supports pulling files directly from a URL to the device using the device's own download capabilities."""
self.ssh_ctl_chan = ssh_conn
self.source_file = source_file
self.dest_file = dest_file
self.direction = direction
self.socket_timeout = socket_timeout
self.progress = progress
self.progress4 = progress4
self.pull_from_url = direction == "url_pull"
self.source_sha1 = None

auto_flag = (
"cisco_ios" in ssh_conn.device_type
or "cisco_xe" in ssh_conn.device_type
or "cisco_xr" in ssh_conn.device_type
)
if not file_system:
if auto_flag:
self.file_system = self.ssh_ctl_chan._autodetect_fs()
else:
raise ValueError("Destination file system not specified")
else:
self.file_system = file_system

if direction == "put":
self.source_md5 = self.file_md5(source_file) if hash_supported else None
self.file_size = os.stat(source_file).st_size
elif direction == "get":
self.source_md5 = self.remote_md5(remote_file=source_file) if hash_supported else None
self.file_size = self.remote_file_size(remote_file=source_file)
elif direction == "url_pull":
if not isinstance(source_file, FilePullSpec):
raise ValueError("When direction is 'url_pull', source_file must be a FilePullSpec instance.")
# For url_pull, source_file is the URL and dest_file is the filename to save as on the device
if source_file.hashing_algorithm and source_file.hashing_algorithm.lower() not in {"md5", "sha512"}:
raise ValueError("When direction is 'url_pull', hashing_algorithm must be either 'md5' or 'sha512'.")
self.source_md5 = (
source_file.checksum if hash_supported and source_file.hashing_algorithm.lower() == "md5" else None
)
self.source_sha512 = (
source_file.checksum if hash_supported and source_file.hashing_algorithm.lower() == "sha512" else None
)
self.file_size = source_file.file_size
self.direction = (
"put" # FileTransfer only supports put and get, so treat url_pull as put for the transfer process
)
else:
raise ValueError("Invalid direction specified")

def pull_file(self) -> None:
"""Download the file from the URL to the device using the device's own download capabilities."""
url = self.source_file.clean_url
current_prompt = self.ssh_ctl_chan.find_prompt()
expect_regex = rf"(Destination filename|{re.escape(current_prompt)}|confirm|Password|Address or name of remote host|Source username|Source filename|yes/no|Are you sure you want to continue connecting)"
command = f"copy {url} {self.file_system}{self.dest_file}"
# Use VRF specified, unless using http or https, which don't support vrf in the copy command
if self.source_file.vrf and self.source_file.scheme not in ["http", "https"]:
command = f"{command} vrf {self.source_file.vrf}"
output = self.ssh_ctl_chan.send_command(command, expect_string=expect_regex, read_timeout=300)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to fail if the file transfer takes longer than 5 minutes? While the file is transferring netmiko is just waiting to match on one of the prompts right? I think we're still going to see use cases where large files are transferred over legacy circuits that may take a long time.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair, I didn't know what to set this to. I will increase and make this a variable.

for _ in range(10):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be an endless loop or a while current_prompt not in output and we probably need to raise an exception if none of the prompts match?

if current_prompt in output:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a 1:1 mapping of prompts in this loop and prompts in expect_regex that's being passed to netmiko above but that's not clearly defined as a requirement. If someone adds another prompt to the regex above but forgets to add it here this would fall through and it would be difficult to troubleshoot. If netmiko doesn't already have a pattern for responding to prompts we should write our own here. eg:

prompt_answers = {
    current_prompt: None,  # break out of the loop
    r"(confirm|Address or name of remote host|Source filename|Destination filename)": "",  # Press enter
    r"Password": self.source_file.token,
    r"Source username": self.source_file.username,
    r"yes/no|Are you sure you want to continue connecting": "yes",
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could build the expect_regex from '|'.join(prompt_answers.keys()) so that the two code paths are always in sync.

return
# Assume that the filename and address are sent with the url.
Copy link

@gsnider2195 gsnider2195 Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Assume that the filename and address are sent with the url.
# Accept defaults for these prompts because the filename and address are sent with the url.

if re.search(
r"(confirm|Address or name of remote host|Source filename|Destination filename)", output, re.IGNORECASE
):
output = self.ssh_ctl_chan.send_command("", expect_string=expect_regex, read_timeout=300)
if re.search(r"Password", output, re.IGNORECASE):
output = self.ssh_ctl_chan.send_command(
self.source_file.token, expect_string=expect_regex, read_timeout=300, cmd_verify=False
)
if re.search(r"Source username", output, re.IGNORECASE):
output = self.ssh_ctl_chan.send_command(
self.source_file.username, expect_string=expect_regex, read_timeout=300
)
if re.search(r"yes/no|Are you sure you want to continue connecting", output, re.IGNORECASE):
output = self.ssh_ctl_chan.send_command("yes", expect_string=expect_regex, read_timeout=300)

def transfer_file(self) -> None:
"""Override the transfer_file method to pull from URL if pull_from_url is True."""
if self.pull_from_url:
self.pull_file()
else:
super().transfer_file()

def remote_sha512(self, base_cmd: str = "verify /sha512", remote_file: Optional[str] = None) -> str:
"""Calculate the sha512 hash of the file on the device."""
if remote_file is None:
if self.direction == "put":
remote_file = self.dest_file
elif self.direction == "get":
remote_file = self.source_file
remote_sha512_cmd = f"{base_cmd} {self.file_system}/{remote_file}"
dest_sha512 = self.ssh_ctl_chan._send_command_str(remote_sha512_cmd, read_timeout=300) # pylint: disable=protected-access
dest_sha512 = self.process_md5(dest_sha512) # Process MD5 still does what we want for parsing the output.
return dest_sha512

def compare_md5(self) -> bool:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're trying too hard to fit a square peg into a round hole subclassing the CiscoIosFileTransfer class. I would prefer if we just vendored the whole thing and rip out the parts we don't need (get/put). That gives us more flexibility to update it as we see fit to support things like different hashing algorithms. I know I won't be able to remember all of the workarounds that were required to make this work when the time comes to add support for another platform.

"""Overload compare_md5 to handle sha512."""
if self.source_sha512:
dest_sha512 = self.remote_sha512()
return self.source_sha512 == dest_sha512
return super().compare_md5()


@fix_docs
class IOSDevice(BaseDevice):
"""Cisco IOS Device Implementation."""
Expand Down Expand Up @@ -105,10 +234,16 @@ def _enter_config(self):
log.debug("Host %s: Device entered config mode.", self.host)

def _file_copy_instance(self, src, dest=None, file_system="flash:"):
"""Create a FileTransfer instance for copying a file to the device."""
direction = "put"
if dest is None:
dest = os.path.basename(src)
if isinstance(src, FilePullSpec):
dest = src.file_name
direction = "url_pull"
else:
dest = os.path.basename(src)

file_copy = FileTransfer(self.native, src, dest, file_system=file_system)
file_copy = FileTransferURLPull(self.native, src, dest, direction=direction, file_system=file_system)
log.debug("Host %s: File copy instance %s.", self.host, file_copy)
return file_copy

Expand Down Expand Up @@ -634,8 +769,9 @@ def file_copy(self, src, dest=None, file_system=None):
# raise FileTransferError('Not enough space available.')

try:
file_copy.enable_scp()
file_copy.establish_scp_conn()
if not isinstance(src, FilePullSpec):
file_copy.enable_scp()
file_copy.establish_scp_conn()
file_copy.transfer_file()
log.info("Host %s: File %s transferred successfully.", self.host, src)
except OSError as error:
Expand All @@ -648,7 +784,8 @@ def file_copy(self, src, dest=None, file_system=None):
log.error("Host %s: File transfer error %s", self.host, FileTransferError.default_message)
raise FileTransferError
finally:
file_copy.close_scp_chan()
if not isinstance(src, FilePullSpec):
file_copy.close_scp_chan()

# Ensure connection to device is still open after long transfers
self.open()
Expand All @@ -663,7 +800,7 @@ def file_copy(self, src, dest=None, file_system=None):

# TODO: Make this an internal method since exposing file_copy should be sufficient
def file_copy_remote_exists(self, src, dest=None, file_system=None):
"""Copy file to device.
"""Check if file exists on remote device.

Args:
src (str): Source of file.
Expand Down
73 changes: 73 additions & 0 deletions pyntc/utils/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""Data Models for Pyntc."""

from dataclasses import asdict, dataclass, field
from typing import Optional
from urllib.parse import urlparse

# Use Hashing algorithms from Nautobot's supported list.
HASHING_ALGORITHMS = {"md5", "sha1", "sha224", "sha384", "sha256", "sha512", "sha3", "blake2", "blake3"}


@dataclass
class FilePullSpec:
"""Data class to represent the specification for pulling a file from a URL to a network device.

Args:
download_url (str): The URL to download the file from. Can include credentials, but it's recommended to use the username and token fields instead for security reasons.
checksum (str): The expected checksum of the file.
file_name (str): The name of the file to be saved on the device.
hashing_algorithm (str): The hashing algorithm to use for checksum verification. Defaults to "md5".

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
hashing_algorithm (str): The hashing algorithm to use for checksum verification. Defaults to "md5".
hashing_algorithm (str, optional): The hashing algorithm to use for checksum verification. Defaults to "md5".

file_size (int, optional): The expected size of the file in bytes. Optional but can be used for an additional layer of verification.
username (str, optional): The username for authentication if required by the URL. Optional if credentials are included in the URL.
token (str, optional): The password or token for authentication if required by the URL. Optional if credentials are included in the URL.
vrf (str, optional): The VRF to use for the download if the device supports VRFs. Optional.
ftp_passive (bool): Whether to use passive mode for FTP downloads. Defaults to True.
"""

download_url: str
checksum: str
file_name: str
hashing_algorithm: str = "md5"
file_size: Optional[int] = None # Size in bytes
username: Optional[str] = None
token: Optional[str] = None # Password/Token
vrf: Optional[str] = None
ftp_passive: bool = True

# This field is calculated, so we don't pass it in the constructor
clean_url: str = field(init=False)
scheme: str = field(init=False)

def __post_init__(self):
"""Validate the input and prepare the clean URL after initialization."""
# 1. Validate the hashing algorithm choice
if self.hashing_algorithm.lower() not in HASHING_ALGORITHMS:
raise ValueError(f"Unsupported algorithm. Choose from: {HASHING_ALGORITHMS}")

# Parse the url to extract components
parsed = urlparse(self.download_url)

# Extract username/password from URL if not already provided as arguments
if parsed.username and not self.username:
self.username = parsed.username
if parsed.password and not self.token:
self.token = parsed.password

# 3. Create the 'clean_url' (URL without the credentials)
# This is what you actually send to the device if using ip http client
port = f":{parsed.port}" if parsed.port else ""
self.clean_url = f"{parsed.scheme}://{parsed.hostname}{port}{parsed.path}"
self.scheme = parsed.scheme

# Handle query params if they exist (though we're avoiding '?' for Cisco)
if parsed.query:
self.clean_url += f"?{parsed.query}"

@classmethod
def from_dict(cls, data: dict):
"""Allows users to just pass a dictionary if they prefer."""
return cls(**data)

def to_dict(self):
"""Useful for logging or passing to other Nornir tasks."""
return asdict(self)
Loading