Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lenovo_update_firmware json file removal #115

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 197 additions & 33 deletions examples/lenovo_update_firmware.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,14 @@ def lenovo_update_firmware(ip, login_account, login_password, image, targets, fs
:type fsdir: string
:returns: returns firmware updating result
"""
result = {}
download_flag = False
fixid = os.path.splitext(image)[0]
xml_file = fixid + '.xml'
# Connect using the address, account name, and password
login_host = "https://" + ip
login_host = "https://" + ip
try:
# Create a REDFISH object
result = {}
REDFISH_OBJ = redfish.redfish_client(base_url=login_host, username=login_account,
password=login_password, default_prefix='/redfish/v1', cafile=utils.g_CAFILE)
REDFISH_OBJ.login(auth="basic")
Expand All @@ -82,6 +85,38 @@ def lenovo_update_firmware(ip, login_account, login_password, image, targets, fs
return result

response_update_service_url = REDFISH_OBJ.get(update_service_url, None)

if fsprotocol.lower() != 'http':
# Check user specify parameter
if fsprotocol.upper() in ["SFTP", "TFTP"]:
download_flag = sftp_download_xmlfile(xml_file, fsip, fsport, fsusername, fspassword, fsdir)
if fsprotocol.upper() == 'TFTP':
fsport = '69'
fsdir = '/upload'

# Defines a dictionary for storing pre request information.
if not download_flag:
xml_file_path = fsdir + os.sep + fixid + '.xml'
else:
xml_file_path = os.getcwd() + os.sep + xml_file

if os.path.isfile(xml_file_path):
sups_info_dict = parse_xml(xml_file_path)
if download_flag:
# Delete xml file
os.remove(xml_file_path)
else:
result = {'ret': False, 'msg': "The firmware xml doesn't exist, please download the firmware xml file."}
return result

# Check whether the prerequest version meets the requirements
prereq_list = sups_info_dict['prereq']
if prereq_list:
pre_result = check_pre_req_version(prereq_list,response_update_service_url,REDFISH_OBJ)
if pre_result['ret'] != True:
result = pre_result
return result

if response_update_service_url.status == 200:
# Check if BMC is 20A or before version of SR635/655. if yes, go through OEM way, else, call standard update way.
if "Oem" in response_update_service_url.dict['Actions']:
Expand All @@ -90,9 +125,22 @@ def lenovo_update_firmware(ip, login_account, login_password, image, targets, fs
result = {'ret': False,
'msg': "SR635/SR655 products only supports specifying BMC or UEFI to refresh."}
return result
# Check if BMC is 20B version of SR635/655, if yes, use multipart Uri to update Firmware
if "MultipartHttpPushUri" in response_update_service_url.dict and fsprotocol.upper() == "HTTPPUSH":
Multipart_Uri = login_host + response_update_service_url.dict["MultipartHttpPushUri"]

# Check if multiparthttppushuri exists between response_update_service_url.dict['Oem']['AMIUpdateService'] / response_update_service_url.dict
multiparthttppushuri_exist = False
multiparthttppushuri_oem_exist = False
if 'Oem' in response_update_service_url.dict:
if 'AMIUpdateService' in response_update_service_url.dict['Oem']:
if "MultipartHttpPushUri" in response_update_service_url.dict['Oem']['AMIUpdateService']:
multiparthttppushuri_oem_exist = True
if "MultipartHttpPushUri" in response_update_service_url.dict:
multiparthttppushuri_exist = True
# if yes, use multipart Uri to update Firmware
if (multiparthttppushuri_oem_exist is True or multiparthttppushuri_exist is True) and fsprotocol.upper() == "HTTPPUSH":
if multiparthttppushuri_oem_exist is True:
firmware_update_url = login_host + response_update_service_url.dict['Oem']['AMIUpdateService']["MultipartHttpPushUri"]
elif multiparthttppushuri_exist is True:
firmware_update_url = login_host + response_update_service_url.dict["MultipartHttpPushUri"]
BMC_Parameter = {"Targets": ["/redfish/v1/Managers/Self"]}
if targets[0].upper() == "BMC":
Oem_Parameter = {"FlashType": "HPMFwUpdate", "UploadSelector": "Default"}
Expand All @@ -103,34 +151,21 @@ def lenovo_update_firmware(ip, login_account, login_password, image, targets, fs
'msg': "SR635/SR655 products only supports specifying BMC or UEFI to refresh."}
return result

# Create a temporary file to write to the OEM value
parameter_file = os.getcwd() + os.sep + "parameters.json"
oem_parameter_file = os.getcwd() + os.sep + "oem_parameters.json"

with open(parameter_file, 'w') as f:
f.write(json.dumps(BMC_Parameter))
with open(oem_parameter_file, 'w') as f:
f.write(json.dumps(Oem_Parameter))

F_parameter = open(parameter_file, 'rb')
F_oparameter = open(oem_parameter_file, 'rb')
F_image = open(fsdir + os.sep + image, 'rb')
# Specify the parameters required to update the firmware
files = {'UpdateParameters': ("parameters.json", F_parameter, 'application/json'),
'OemParameters': (
"oem_parameters.json", F_oparameter, 'application/json'),
'UpdateFile': (image, open(fsdir + os.sep + image, 'rb'), 'multipart/form-data')}
files = {'UpdateParameters': ("parameters.json", json.dumps(BMC_Parameter), 'application/json'),
'OemParameters': ("oem_parameters.json", json.dumps(Oem_Parameter), 'application/json'),
'UpdateFile': (image, F_image, 'multipart/form-data')}

# Send a post command through requests to update the firmware
# Ignore SSL Certificates
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
# Set BMC access credential
auth = HTTPBasicAuth(login_account, login_password)
print("Start to upload the image, may take about 3~10 minutes...\n")
firmware_update_url = Multipart_Uri
response = requests.post(Multipart_Uri, auth=auth, files=files, verify=False)
response = requests.post(firmware_update_url, auth=auth, files=files, verify=False)
response_code = response.status_code
F_parameter.close()
F_oparameter.close()
F_image.close()
else:
if fsprotocol.upper() != "HTTP":
result = {'ret': False, 'msg': "SR635/SR655 products only supports the HTTP protocol to update firmware."}
Expand All @@ -154,13 +189,13 @@ def lenovo_update_firmware(ip, login_account, login_password, image, targets, fs
response = REDFISH_OBJ.post(firmware_update_url, body=body)
response_code = response.status
else:
result = update_firmware.update_fw(ip, login_account, login_password, image, targets, fsprotocol, fsip,
result = update_firmware.update_firmware(ip, login_account, login_password, image, targets, fsprotocol, fsip,
fsport, fsusername, fspassword, fsdir)
return result

if response_code in [200, 202, 204]:
# For BMC update, BMC will restart automatically, the session connection will be disconnected, user have to wait BMC to restart.
# For UEFI update, the script can monitor the update task via BMC.
# For UEFI update, the script can monitor the update task via BMC.
if targets[0].upper() == "BMC":
result = {'ret': True, 'msg': 'BMC refresh successfully, wait about 5 minutes for BMC to restart.'}
return result
Expand All @@ -177,7 +212,8 @@ def lenovo_update_firmware(ip, login_account, login_password, image, targets, fs
result = {'ret': True, 'msg': "Update firmware successfully"}
else:
task_id = result["id"]
result = {'ret': False, 'msg': "Failed to update firmware, task id is %s, task state is %s" % (task_id, task_state) }
result = {'ret': False,'msg': "Failed to update firmware, task id is %s, task state is %s" % (task_id, task_state)}
# Delete task
REDFISH_OBJ.delete(task_uri, None)
return result
else:
Expand All @@ -194,11 +230,6 @@ def lenovo_update_firmware(ip, login_account, login_password, image, targets, fs
except Exception as e:
result = {'ret': False, 'msg': "error_message: %s" % (e)}
finally:
# Delete the temporary file if it exists
if os.path.exists(os.getcwd() + os.sep + "parameters.json"):
os.remove(os.getcwd() + os.sep + "parameters.json")
if os.path.exists(os.getcwd() + os.sep + "oem_parameters.json"):
os.remove(os.getcwd() + os.sep + "oem_parameters.json")
# Logout of the current session
try:
REDFISH_OBJ.logout()
Expand All @@ -207,6 +238,139 @@ def lenovo_update_firmware(ip, login_account, login_password, image, targets, fs
return result


import xml.etree.ElementTree as ET
import os
import re
def parse_xml(xml_file_path):
"""Parse fixid xml files"""
tree = ET.parse(xml_file_path)
root = tree.getroot()
# Build an empty dictionary to store information in XML
sups_dict = {}
# Build an empty list to store prerequest information
pre_req = []
mt_list = []
# Add MT
info_list = ["category", "Version", "payload", "xmlFilename", "preReq", "buildNumber"]
for node in root.iter('PROPERTY.ARRAY'):
if "preReq" in node.attrib['NAME']:
for value in node.iter('VALUE'):
pre_req.append(value.text)
if "applicableMachineTypes" in node.attrib['NAME']:
for MT in node.iter('VALUE'):
regularity = re.compile(r"[[](.*?)[]]")
mt = regularity.findall(MT.text)
mt_list.append(mt)
sups_dict['prereq'] = pre_req
for node in root.iter('PROPERTY'):
for info_name in info_list:
if node.attrib['NAME'] == info_name:
for value in node.iter('VALUE'):
sups_dict[info_name] = value.text
sups_dict['Version'] = sups_dict['Version'].split(' ')[1]
sups_dict['mt_list'] = mt_list
return sups_dict


def check_pre_req_version(prereq_list,response_update_service_url,REDFISH_OBJ):
''' Check whether the prerequest version meets the requirements '''
pre_req_flag_xcc = {'status': True}
pre_req_flag_uefi = {'status': True}
firmwareInventory_url_bmc = response_update_service_url.dict['FirmwareInventory']['@odata.id'] + '/' + 'BMC-Primary'
update_firmware_dict_bmc = REDFISH_OBJ.get(firmwareInventory_url_bmc, None)
firmwareInventory_url_uefi = response_update_service_url.dict['FirmwareInventory']['@odata.id'] + '/' + 'UEFI'
update_firmware_dict_uefi = REDFISH_OBJ.get(firmwareInventory_url_uefi, None)
for pre_req in prereq_list:
# 'lnvgy_fw_xcc_afbt05m-0.02_anyos_noarch'
pre_req_list = pre_req.split('_')
if pre_req_list[2].lower() in ['xcc', 'bmc', 'uefi', 'lxpm', 'drvwn', 'drvln']:
pre_software = pre_req_list[3].split('-')[0][:3].lower()
pre_version = pre_req_list[3].split('-')[1]
if pre_req_list[2].lower() in ['xcc', 'bmc']:
# Check the versions on different machines,for example SR650 and SR635
if 'BMC' in update_firmware_dict_bmc.dict['SoftwareId']:
current_software = update_firmware_dict_bmc.dict['SoftwareId'].split('-')[1][:3].lower()
current_version = update_firmware_dict_bmc.dict['Version'].split('-')[1]
else:
current_software = update_firmware_dict_bmc.dict['SoftwareId'][:3].lower()
current_version = update_firmware_dict_bmc.dict['Version']
if current_software == pre_software:
if current_version >= pre_version:
pre_req_flag_xcc['status'] = True
else:
pre_req_flag_xcc['status'] = False
pre_req_flag_xcc['type'] = 'BMC-Priamry'
pre_req_flag_xcc['current_version'] = current_version
pre_req_flag_xcc['pre_req_version'] = pre_version
else:
continue
elif pre_req_list[2].lower() == 'uefi':
if 'UEFI' in update_firmware_dict_uefi.dict['SoftwareId']:
current_version = update_firmware_dict_uefi.dict['Version'].split('-')[1]
else:
current_version = update_firmware_dict_uefi.dict['Version']
if current_version > pre_version:
pre_req_flag_uefi['status'] = True
else:
pre_req_flag_uefi['status'] = False
pre_req_flag_uefi['type'] = 'UEFI'
pre_req_flag_uefi['current_version'] = current_version
pre_req_flag_uefi['pre_req_version'] = pre_version
else:
pre_req_flag_xcc['status'] = False

if pre_req_flag_xcc['status'] == False or pre_req_flag_uefi['status'] == False:
if pre_req_flag_xcc['status'] != pre_req_flag_uefi['status']:
if pre_req_flag_xcc['status'] == False:
pre_req_flag = pre_req_flag_xcc
else:
pre_req_flag = pre_req_flag_uefi
result = {'ret': False,
'msg': "Update firmware was not performed. "
"Because the current version %s of %s does not meet the pre request of version %s. " %
(pre_req_flag['current_version'], pre_req_flag['type'], pre_req_flag['pre_req_version'])
}
return result
else:
result = {'ret': False,
'msg': "Update firmware was not performed. "
"Because the current version %s of %s does not meet the pre request of version %s and the current version %s of %s does not meet the pre request of version %s. " %
(pre_req_flag_xcc['current_version'], pre_req_flag_xcc['type'],
pre_req_flag_xcc['pre_req_version'],
pre_req_flag_uefi['current_version'], pre_req_flag_uefi['type'],
pre_req_flag_uefi['pre_req_version'])
}
return result
else:
result = {'ret': True}
return result


import paramiko
def sftp_download_xmlfile(xml_file, fsip, fsport, fsusername, fspassword, fsdir):
"""Download the firmware xml file"""
# Connect file server using the fsip, fsusername, and fspassword
download_flag = False
try:
sf = paramiko.Transport((fsip, int(fsport)))
sf.connect(username=fsusername, password=fspassword)
sftp = paramiko.SFTPClient.from_transport(sf)
# Judge whether the file is on SFTP.
files_list = sftp.listdir(fsdir)
remote_path = "/" + fsdir + "/"+ xml_file
local_path = os.getcwd() + os.sep + xml_file

if xml_file in files_list:
sftp.get(remote_path,local_path)
download_flag = True
else:
print("There is no firmware xml file on the remote server, please download the firmware xml file..")
sf.close()
except Exception as e:
print('download exception:', e)
return download_flag


def task_monitor(REDFISH_OBJ, task_uri):
"""Monitor task status"""
END_TASK_STATE = ["Cancelled", "Completed", "Exception", "Killed", "Interrupted", "Suspended", "Done", "Failed when Flashing Image."]
Expand Down Expand Up @@ -251,7 +415,7 @@ def add_helpmessage(argget):
argget.add_argument('--fsport', type=str, default='', help='Specify the file server port')
argget.add_argument('--fsusername', type=str, help='Specify the file server username.')
argget.add_argument('--fspassword', type=str, help='Specify the file server password.')
argget.add_argument('--fsdir', type=str, help='Specify the file server dir to the firmware upload.')
argget.add_argument('--fsdir', type=str, help='Specify the path to the firmware, either on the file server or locally.')


import os
Expand Down