-
Notifications
You must be signed in to change notification settings - Fork 52
Add file_pull option to Cisco IOS devices #345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Added the ability to download files from within a Cisco IOS device. |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -3,13 +3,18 @@ | |||||
| import os | ||||||
| import re | ||||||
| import time | ||||||
| from typing import TYPE_CHECKING, Any, Callable, Optional, Union | ||||||
|
|
||||||
| from netmiko import ConnectHandler, FileTransfer | ||||||
| from netmiko import ConnectHandler | ||||||
| from netmiko.cisco import CiscoIosFileTransfer | ||||||
| from netmiko.exceptions import ReadTimeout | ||||||
|
|
||||||
| from pyntc import log | ||||||
| from pyntc.devices.base_device import BaseDevice, RollbackError, fix_docs | ||||||
| from pyntc.errors import ( | ||||||
| if TYPE_CHECKING: | ||||||
| from netmiko.base_connection import BaseConnection | ||||||
|
|
||||||
| from pyntc import log # pylint: disable=wrong-import-position | ||||||
| from pyntc.devices.base_device import BaseDevice, RollbackError, fix_docs # pylint: disable=wrong-import-position | ||||||
| from pyntc.errors import ( # pylint: disable=wrong-import-position | ||||||
| CommandError, | ||||||
| CommandListError, | ||||||
| DeviceNotActiveError, | ||||||
|
|
@@ -21,7 +26,8 @@ | |||||
| RebootTimeoutError, | ||||||
| SocketClosedError, | ||||||
| ) | ||||||
| from pyntc.utils import get_structured_data | ||||||
| from pyntc.utils import get_structured_data # pylint: disable=wrong-import-position | ||||||
| from pyntc.utils.models import FilePullSpec # pylint: disable=wrong-import-position | ||||||
|
|
||||||
| BASIC_FACTS_KM = {"model": "hardware", "os_version": "version", "serial_number": "serial", "hostname": "hostname"} | ||||||
| RE_SHOW_REDUNDANCY = re.compile( | ||||||
|
|
@@ -36,6 +42,129 @@ | |||||
| INSTALL_MODE_FILE_NAME = "packages.conf" | ||||||
|
|
||||||
|
|
||||||
| class FileTransferURLPull(CiscoIosFileTransfer): | ||||||
| """Custom FileTransfer class to optionally allow downloading files from a url.""" | ||||||
|
|
||||||
| # Netmiko's FileTransfer class requires a source_file to exist. | ||||||
| # When doing a url pull, the source_file is the FilePullSpec, which doesn't exist on the filesystem, | ||||||
| # so we have to override the __init__ method to get around this, and not call super().__init__(), since it assumes the file exists. | ||||||
| def __init__( # pylint: disable=super-init-not-called, too-many-positional-arguments | ||||||
| self, | ||||||
| ssh_conn: "BaseConnection", | ||||||
| source_file: Union[str, "FilePullSpec"], | ||||||
| dest_file: str, | ||||||
| file_system: Optional[str] = None, | ||||||
| direction: str = "put", | ||||||
| socket_timeout: float = 10.0, | ||||||
| progress: Optional[Callable[..., Any]] = None, | ||||||
| progress4: Optional[Callable[..., Any]] = None, | ||||||
| hash_supported: bool = True, | ||||||
| ) -> None: | ||||||
| """Uses Netmiko's FileTransfer class to transfer files to and from the device, but also supports pulling files directly from a URL to the device using the device's own download capabilities.""" | ||||||
| self.ssh_ctl_chan = ssh_conn | ||||||
| self.source_file = source_file | ||||||
| self.dest_file = dest_file | ||||||
| self.direction = direction | ||||||
| self.socket_timeout = socket_timeout | ||||||
| self.progress = progress | ||||||
| self.progress4 = progress4 | ||||||
| self.pull_from_url = direction == "url_pull" | ||||||
| self.source_sha1 = None | ||||||
|
|
||||||
| auto_flag = ( | ||||||
| "cisco_ios" in ssh_conn.device_type | ||||||
| or "cisco_xe" in ssh_conn.device_type | ||||||
| or "cisco_xr" in ssh_conn.device_type | ||||||
| ) | ||||||
| if not file_system: | ||||||
| if auto_flag: | ||||||
| self.file_system = self.ssh_ctl_chan._autodetect_fs() | ||||||
| else: | ||||||
| raise ValueError("Destination file system not specified") | ||||||
| else: | ||||||
| self.file_system = file_system | ||||||
|
|
||||||
| if direction == "put": | ||||||
| self.source_md5 = self.file_md5(source_file) if hash_supported else None | ||||||
| self.file_size = os.stat(source_file).st_size | ||||||
| elif direction == "get": | ||||||
| self.source_md5 = self.remote_md5(remote_file=source_file) if hash_supported else None | ||||||
| self.file_size = self.remote_file_size(remote_file=source_file) | ||||||
| elif direction == "url_pull": | ||||||
| if not isinstance(source_file, FilePullSpec): | ||||||
| raise ValueError("When direction is 'url_pull', source_file must be a FilePullSpec instance.") | ||||||
| # For url_pull, source_file is the URL and dest_file is the filename to save as on the device | ||||||
| if source_file.hashing_algorithm and source_file.hashing_algorithm.lower() not in {"md5", "sha512"}: | ||||||
| raise ValueError("When direction is 'url_pull', hashing_algorithm must be either 'md5' or 'sha512'.") | ||||||
| self.source_md5 = ( | ||||||
| source_file.checksum if hash_supported and source_file.hashing_algorithm.lower() == "md5" else None | ||||||
| ) | ||||||
| self.source_sha512 = ( | ||||||
| source_file.checksum if hash_supported and source_file.hashing_algorithm.lower() == "sha512" else None | ||||||
| ) | ||||||
| self.file_size = source_file.file_size | ||||||
| self.direction = ( | ||||||
| "put" # FileTransfer only supports put and get, so treat url_pull as put for the transfer process | ||||||
| ) | ||||||
| else: | ||||||
| raise ValueError("Invalid direction specified") | ||||||
|
|
||||||
| def pull_file(self) -> None: | ||||||
| """Download the file from the URL to the device using the device's own download capabilities.""" | ||||||
| url = self.source_file.clean_url | ||||||
| current_prompt = self.ssh_ctl_chan.find_prompt() | ||||||
| expect_regex = rf"(Destination filename|{re.escape(current_prompt)}|confirm|Password|Address or name of remote host|Source username|Source filename|yes/no|Are you sure you want to continue connecting)" | ||||||
| command = f"copy {url} {self.file_system}{self.dest_file}" | ||||||
| # Use VRF specified, unless using http or https, which don't support vrf in the copy command | ||||||
| if self.source_file.vrf and self.source_file.scheme not in ["http", "https"]: | ||||||
| command = f"{command} vrf {self.source_file.vrf}" | ||||||
| output = self.ssh_ctl_chan.send_command(command, expect_string=expect_regex, read_timeout=300) | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this going to fail if the file transfer takes longer than 5 minutes? While the file is transferring netmiko is just waiting to match on one of the prompts right? I think we're still going to see use cases where large files are transferred over legacy circuits that may take a long time.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fair, I didn't know what to set this to. I will increase and make this a variable. |
||||||
| for _ in range(10): | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should be an endless loop or a |
||||||
| if current_prompt in output: | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's a 1:1 mapping of prompts in this loop and prompts in prompt_answers = {
current_prompt: None, # break out of the loop
r"(confirm|Address or name of remote host|Source filename|Destination filename)": "", # Press enter
r"Password": self.source_file.token,
r"Source username": self.source_file.username,
r"yes/no|Are you sure you want to continue connecting": "yes",
}There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could build the |
||||||
| return | ||||||
| # Assume that the filename and address are sent with the url. | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| if re.search( | ||||||
| r"(confirm|Address or name of remote host|Source filename|Destination filename)", output, re.IGNORECASE | ||||||
| ): | ||||||
| output = self.ssh_ctl_chan.send_command("", expect_string=expect_regex, read_timeout=300) | ||||||
| if re.search(r"Password", output, re.IGNORECASE): | ||||||
| output = self.ssh_ctl_chan.send_command( | ||||||
| self.source_file.token, expect_string=expect_regex, read_timeout=300, cmd_verify=False | ||||||
| ) | ||||||
| if re.search(r"Source username", output, re.IGNORECASE): | ||||||
| output = self.ssh_ctl_chan.send_command( | ||||||
| self.source_file.username, expect_string=expect_regex, read_timeout=300 | ||||||
| ) | ||||||
| if re.search(r"yes/no|Are you sure you want to continue connecting", output, re.IGNORECASE): | ||||||
| output = self.ssh_ctl_chan.send_command("yes", expect_string=expect_regex, read_timeout=300) | ||||||
|
|
||||||
| def transfer_file(self) -> None: | ||||||
| """Override the transfer_file method to pull from URL if pull_from_url is True.""" | ||||||
| if self.pull_from_url: | ||||||
| self.pull_file() | ||||||
| else: | ||||||
| super().transfer_file() | ||||||
|
|
||||||
| def remote_sha512(self, base_cmd: str = "verify /sha512", remote_file: Optional[str] = None) -> str: | ||||||
| """Calculate the sha512 hash of the file on the device.""" | ||||||
| if remote_file is None: | ||||||
| if self.direction == "put": | ||||||
| remote_file = self.dest_file | ||||||
| elif self.direction == "get": | ||||||
| remote_file = self.source_file | ||||||
| remote_sha512_cmd = f"{base_cmd} {self.file_system}/{remote_file}" | ||||||
| dest_sha512 = self.ssh_ctl_chan._send_command_str(remote_sha512_cmd, read_timeout=300) # pylint: disable=protected-access | ||||||
| dest_sha512 = self.process_md5(dest_sha512) # Process MD5 still does what we want for parsing the output. | ||||||
| return dest_sha512 | ||||||
|
|
||||||
| def compare_md5(self) -> bool: | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we're trying too hard to fit a square peg into a round hole subclassing the |
||||||
| """Overload compare_md5 to handle sha512.""" | ||||||
| if self.source_sha512: | ||||||
| dest_sha512 = self.remote_sha512() | ||||||
| return self.source_sha512 == dest_sha512 | ||||||
| return super().compare_md5() | ||||||
|
|
||||||
|
|
||||||
| @fix_docs | ||||||
| class IOSDevice(BaseDevice): | ||||||
| """Cisco IOS Device Implementation.""" | ||||||
|
|
@@ -105,10 +234,16 @@ def _enter_config(self): | |||||
| log.debug("Host %s: Device entered config mode.", self.host) | ||||||
|
|
||||||
| def _file_copy_instance(self, src, dest=None, file_system="flash:"): | ||||||
| """Create a FileTransfer instance for copying a file to the device.""" | ||||||
| direction = "put" | ||||||
| if dest is None: | ||||||
| dest = os.path.basename(src) | ||||||
| if isinstance(src, FilePullSpec): | ||||||
| dest = src.file_name | ||||||
| direction = "url_pull" | ||||||
| else: | ||||||
| dest = os.path.basename(src) | ||||||
|
|
||||||
| file_copy = FileTransfer(self.native, src, dest, file_system=file_system) | ||||||
| file_copy = FileTransferURLPull(self.native, src, dest, direction=direction, file_system=file_system) | ||||||
| log.debug("Host %s: File copy instance %s.", self.host, file_copy) | ||||||
| return file_copy | ||||||
|
|
||||||
|
|
@@ -634,8 +769,9 @@ def file_copy(self, src, dest=None, file_system=None): | |||||
| # raise FileTransferError('Not enough space available.') | ||||||
|
|
||||||
| try: | ||||||
| file_copy.enable_scp() | ||||||
| file_copy.establish_scp_conn() | ||||||
| if not isinstance(src, FilePullSpec): | ||||||
| file_copy.enable_scp() | ||||||
| file_copy.establish_scp_conn() | ||||||
| file_copy.transfer_file() | ||||||
| log.info("Host %s: File %s transferred successfully.", self.host, src) | ||||||
| except OSError as error: | ||||||
|
|
@@ -648,7 +784,8 @@ def file_copy(self, src, dest=None, file_system=None): | |||||
| log.error("Host %s: File transfer error %s", self.host, FileTransferError.default_message) | ||||||
| raise FileTransferError | ||||||
| finally: | ||||||
| file_copy.close_scp_chan() | ||||||
| if not isinstance(src, FilePullSpec): | ||||||
| file_copy.close_scp_chan() | ||||||
|
|
||||||
| # Ensure connection to device is still open after long transfers | ||||||
| self.open() | ||||||
|
|
@@ -663,7 +800,7 @@ def file_copy(self, src, dest=None, file_system=None): | |||||
|
|
||||||
| # TODO: Make this an internal method since exposing file_copy should be sufficient | ||||||
| def file_copy_remote_exists(self, src, dest=None, file_system=None): | ||||||
| """Copy file to device. | ||||||
| """Check if file exists on remote device. | ||||||
|
|
||||||
| Args: | ||||||
| src (str): Source of file. | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,73 @@ | ||||||
| """Data Models for Pyntc.""" | ||||||
|
|
||||||
| from dataclasses import asdict, dataclass, field | ||||||
| from typing import Optional | ||||||
| from urllib.parse import urlparse | ||||||
|
|
||||||
| # Use Hashing algorithms from Nautobot's supported list. | ||||||
| HASHING_ALGORITHMS = {"md5", "sha1", "sha224", "sha384", "sha256", "sha512", "sha3", "blake2", "blake3"} | ||||||
|
|
||||||
|
|
||||||
| @dataclass | ||||||
| class FilePullSpec: | ||||||
| """Data class to represent the specification for pulling a file from a URL to a network device. | ||||||
|
|
||||||
| Args: | ||||||
| download_url (str): The URL to download the file from. Can include credentials, but it's recommended to use the username and token fields instead for security reasons. | ||||||
| checksum (str): The expected checksum of the file. | ||||||
| file_name (str): The name of the file to be saved on the device. | ||||||
| hashing_algorithm (str): The hashing algorithm to use for checksum verification. Defaults to "md5". | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| file_size (int, optional): The expected size of the file in bytes. Optional but can be used for an additional layer of verification. | ||||||
| username (str, optional): The username for authentication if required by the URL. Optional if credentials are included in the URL. | ||||||
| token (str, optional): The password or token for authentication if required by the URL. Optional if credentials are included in the URL. | ||||||
| vrf (str, optional): The VRF to use for the download if the device supports VRFs. Optional. | ||||||
| ftp_passive (bool): Whether to use passive mode for FTP downloads. Defaults to True. | ||||||
| """ | ||||||
|
|
||||||
| download_url: str | ||||||
| checksum: str | ||||||
| file_name: str | ||||||
| hashing_algorithm: str = "md5" | ||||||
| file_size: Optional[int] = None # Size in bytes | ||||||
| username: Optional[str] = None | ||||||
| token: Optional[str] = None # Password/Token | ||||||
| vrf: Optional[str] = None | ||||||
| ftp_passive: bool = True | ||||||
|
|
||||||
| # This field is calculated, so we don't pass it in the constructor | ||||||
| clean_url: str = field(init=False) | ||||||
| scheme: str = field(init=False) | ||||||
|
|
||||||
| def __post_init__(self): | ||||||
| """Validate the input and prepare the clean URL after initialization.""" | ||||||
| # 1. Validate the hashing algorithm choice | ||||||
| if self.hashing_algorithm.lower() not in HASHING_ALGORITHMS: | ||||||
| raise ValueError(f"Unsupported algorithm. Choose from: {HASHING_ALGORITHMS}") | ||||||
|
|
||||||
| # Parse the url to extract components | ||||||
| parsed = urlparse(self.download_url) | ||||||
|
|
||||||
| # Extract username/password from URL if not already provided as arguments | ||||||
| if parsed.username and not self.username: | ||||||
| self.username = parsed.username | ||||||
| if parsed.password and not self.token: | ||||||
| self.token = parsed.password | ||||||
|
|
||||||
| # 3. Create the 'clean_url' (URL without the credentials) | ||||||
| # This is what you actually send to the device if using ip http client | ||||||
| port = f":{parsed.port}" if parsed.port else "" | ||||||
| self.clean_url = f"{parsed.scheme}://{parsed.hostname}{port}{parsed.path}" | ||||||
| self.scheme = parsed.scheme | ||||||
|
|
||||||
| # Handle query params if they exist (though we're avoiding '?' for Cisco) | ||||||
| if parsed.query: | ||||||
| self.clean_url += f"?{parsed.query}" | ||||||
|
|
||||||
| @classmethod | ||||||
| def from_dict(cls, data: dict): | ||||||
| """Allows users to just pass a dictionary if they prefer.""" | ||||||
| return cls(**data) | ||||||
|
|
||||||
| def to_dict(self): | ||||||
| """Useful for logging or passing to other Nornir tasks.""" | ||||||
| return asdict(self) | ||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make pylint and isort happy if you moved the conditional import above down below all of the other imports?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds like it is going to be the solution I will test that.