Skip to content

Commit

Permalink
[Mellanox] run module initialization when any SFP related API is called
Browse files Browse the repository at this point in the history
  • Loading branch information
Junchao-Mellanox committed Apr 16, 2024
1 parent b63ea6c commit 4bf5548
Show file tree
Hide file tree
Showing 22 changed files with 2,141 additions and 2,219 deletions.
248 changes: 217 additions & 31 deletions platform/mellanox/mlnx-platform-api/sonic_platform/chassis.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
import os
from functools import reduce
from .utils import extract_RJ45_ports_index
from . import module_host_mgmt_initializer
from . import utils
from .device_data import DeviceDataManager
import re
import queue
import select
import threading
import time
from sonic_platform import modules_mgmt
except ImportError as e:
raise ImportError (str(e) + "- required module not found")

Expand Down Expand Up @@ -132,9 +132,9 @@ def __init__(self):

Chassis.chassis_instance = self

self.modules_mgmt_thread = threading.Thread()
self.modules_changes_queue = queue.Queue()
self.modules_mgmt_task_stopping_event = threading.Event()
self.module_host_mgmt_initializer = module_host_mgmt_initializer.ModuleHostMgmtInitializer()
self.poll_obj = None
self.registered_fds = None

logger.log_info("Chassis loaded successfully")

Expand Down Expand Up @@ -344,8 +344,11 @@ def get_all_sfps(self):
Returns:
A list of objects derived from SfpBase representing all sfps
available on this chassis
"""
self.initialize_sfp()
"""
if DeviceDataManager.is_module_host_management_mode():
self.module_host_mgmt_initializer.initialize(self)
else:
self.initialize_sfp()
return self._sfp_list

def get_sfp(self, index):
Expand All @@ -362,7 +365,10 @@ def get_sfp(self, index):
An object dervied from SfpBase representing the specified sfp
"""
index = index - 1
self.initialize_single_sfp(index)
if DeviceDataManager.is_module_host_management_mode():
self.module_host_mgmt_initializer.initialize(self)
else:
self.initialize_single_sfp(index)
return super(Chassis, self).get_sfp(index)

def get_port_or_cage_type(self, index):
Expand Down Expand Up @@ -412,42 +418,222 @@ def get_change_event(self, timeout=0):
indicates that fan 0 has been removed, fan 2
has been inserted and sfp 11 has been removed.
"""
if not self.modules_mgmt_thread.is_alive():
# open new SFP change events thread
self.modules_mgmt_thread = modules_mgmt.ModulesMgmtTask(q=self.modules_changes_queue
, main_thread_stop_event = self.modules_mgmt_task_stopping_event)
# Set the thread as daemon so when pmon/xcvrd are shutting down, modules_mgmt will shut down immedietly.
self.modules_mgmt_thread.daemon = True
self.modules_mgmt_thread.start()
self.initialize_sfp()
wait_for_ever = (timeout == 0)
if DeviceDataManager.is_module_host_management_mode():
self.module_host_mgmt_initializer.initialize(self)
return self.get_change_event_for_module_host_management_mode(timeout)
else:
self.initialize_sfp()
return self.get_change_event_legacy(timeout)

def get_change_event_for_module_host_management_mode(self, timeout):
"""Get SFP change event when module host management mode is enabled.
Args:
timeout: Timeout in milliseconds (optional). If timeout == 0,
this method will block until a change is detected.
Returns:
(bool, dict):
- True if call successful, False if not; - Deprecated, will always return True
- A nested dictionary where key is a device type,
value is a dictionary with key:value pairs in the format of
{'device_id':'device_event'},
where device_id is the device ID for this device and
device_event,
status='1' represents device inserted,
status='0' represents device removed.
Ex. {'fan':{'0':'0', '2':'1'}, 'sfp':{'11':'0'}}
indicates that fan 0 has been removed, fan 2
has been inserted and sfp 11 has been removed.
"""
if not self.poll_obj:
self.poll_obj = select.poll()
self.registered_fds = {}
for s in self._sfp_list:
fds = s.get_fds_for_poling()
for fd_type, fd in fds.items():
self.poll_obj.register(fd, select.POLLERR | select.POLLPRI)
self.registered_fds[fd.fileno()] = (s.sdk_index, fd, fd_type)

logger.log_debug(f'Registered SFP file descriptors for polling: {self.registered_fds}')

from . import sfp

wait_forever = (timeout == 0)
# poll timeout should be no more than 1000ms to ensure fast shutdown flow
timeout = 1000.0 if timeout >= 1000 else float(timeout)
port_dict = {}
error_dict = {}
begin = time.time()
wait_ready_task = sfp.SFP.get_wait_ready_task()

while True:
fds_events = self.poll_obj.poll(timeout)
for fileno, _ in fds_events:
if fileno not in self.registered_fds:
logger.log_error(f'Unknown file no {fileno} from poll event, registered files are {self.registered_fds}')
continue

sfp_index, fd, fd_type = self.registered_fds[fileno]
s = self._sfp_list[sfp_index]
fd_value = int(fd.read().strip())

# Detecting dummy event
if s.is_dummy_event(fd_type, fd_value):
# Ignore dummy event for the first poll, assume SDK only provide 1 dummy event
logger.log_debug(f'Ignore dummy event {fd_type}:{fd_value} for SFP {sfp_index}')
continue

logger.log_notice(f'Got SFP event: index={sfp_index}, type={fd_type}, value={fd_value}')
if fd_type == 'hw_present':
# event could be EVENT_NOT_PRESENT or EVENT_PRESENT
event = sfp.EVENT_NOT_PRESENT if fd_value == 0 else sfp.EVENT_PRESENT
s.on_event(event)
elif fd_type == 'present':
if str(fd_value) == sfp.SFP_STATUS_ERROR:
# FW control cable got an error, no need trigger state machine
sfp_status, error_desc = s.get_error_info_from_sdk_error_type()
port_dict[sfp_index + 1] = sfp_status
if error_desc:
error_dict[sfp_index + 1] = error_desc
continue
elif str(fd_value) == sfp.SFP_STATUS_INSERTED:
# FW control cable got present, only case is that the cable is recovering
# from an error. FW control cable has no transition from "Not Present" to "Present"
# because "Not Present" cable is always "software control" and should always poll
# hw_present sysfs instead of present sysfs.
port_dict[sfp_index + 1] = sfp.SFP_STATUS_INSERTED
continue
else:
s.on_event(sfp.EVENT_NOT_PRESENT)
else:
# event could be EVENT_POWER_GOOD or EVENT_POWER_BAD
event = sfp.EVENT_POWER_BAD if fd_value == 0 else sfp.EVENT_POWER_GOOD
s.on_event(event)

if s.in_stable_state():
s.fill_change_event(port_dict)
s.refresh_poll_obj(self.poll_obj, self.registered_fds)
else:
logger.log_debug(f'SFP {sfp_index} does not reach stable state, state={s.state}')

ready_sfp_set = wait_ready_task.get_ready_set()
for sfp_index in ready_sfp_set:
s = self._sfp_list[sfp_index]
s.on_event(sfp.EVENT_RESET_DONE)
if s.in_stable_state():
s.fill_change_event(port_dict)
s.refresh_poll_obj(self.poll_obj, self.registered_fds)
else:
logger.log_error(f'SFP {sfp_index} failed to reach stable state, state={s.state}')

if port_dict:
logger.log_notice(f'Sending SFP change event: {port_dict}, error event: {error_dict}')
self.reinit_sfps(port_dict)
return True, {
'sfp': port_dict,
'sfp_error': error_dict
}
else:
if not wait_forever:
elapse = time.time() - begin
if elapse * 1000 >= timeout:
return True, {'sfp': {}}

def get_change_event_legacy(self, timeout):
"""Get SFP change event when module host management is disabled.
Args:
timeout (int): polling timeout in ms
Returns:
(bool, dict):
- True if call successful, False if not; - Deprecated, will always return True
- A nested dictionary where key is a device type,
value is a dictionary with key:value pairs in the format of
{'device_id':'device_event'},
where device_id is the device ID for this device and
device_event,
status='1' represents device inserted,
status='0' represents device removed.
Ex. {'fan':{'0':'0', '2':'1'}, 'sfp':{'11':'0'}}
indicates that fan 0 has been removed, fan 2
has been inserted and sfp 11 has been removed.
"""
if not self.poll_obj:
self.poll_obj = select.poll()
self.registered_fds = {}
# SDK always sent event for the first time polling. Such event should not be sent to xcvrd.
# Store SFP state before first time polling so that we can detect dummy event.
self.sfp_states_before_first_poll = {}
for s in self._sfp_list:
fd = s.get_fd_for_polling_legacy()
self.poll_obj.register(fd, select.POLLERR | select.POLLPRI)
self.registered_fds[fd.fileno()] = (s.sdk_index, fd)
self.sfp_states_before_first_poll[s.sdk_index] = s.get_module_status()

logger.log_debug(f'Registered SFP file descriptors for polling: {self.registered_fds}')

from . import sfp

wait_forever = (timeout == 0)
# poll timeout should be no more than 1000ms to ensure fast shutdown flow
timeout = 1000.0 if timeout >= 1000 else float(timeout)
port_dict = {}
error_dict = {}
begin = time.time()
i = 0

while True:
try:
logger.log_info(f'get_change_event() trying to get changes from queue on iteration {i}')
port_dict = self.modules_changes_queue.get(timeout=timeout / 1000)
logger.log_info(f'get_change_event() iteration {i} port_dict: {port_dict}')
except queue.Empty:
logger.log_info(f"failed to get item from modules changes queue on itertaion {i}")
fds_events = self.poll_obj.poll(timeout)
for fileno, _ in fds_events:
if fileno not in self.registered_fds:
logger.log_error(f'Unknown file no {fileno} from poll event, registered files are {self.registered_fds}')
continue

sfp_index, fd = self.registered_fds[fileno]
fd.seek(0)
fd.read()
s = self._sfp_list[sfp_index]
sfp_status = s.get_module_status()

if sfp_index in self.sfp_states_before_first_poll:
# Detecting dummy event
sfp_state_before_poll = self.sfp_states_before_first_poll[sfp_index]
self.sfp_states_before_first_poll.pop(sfp_index)
if sfp_state_before_poll == sfp_status:
# Ignore dummy event for the first poll, assume SDK only provide 1 dummy event
logger.log_debug(f'Ignore dummy event {sfp_status} for SFP {sfp_index}')
continue

logger.log_notice(f'Got SFP event: index={sfp_index}, value={sfp_status}')
if sfp_status == sfp.SFP_STATUS_UNKNOWN:
# in the following sequence, STATUS_UNKNOWN can be returned.
# so we shouldn't raise exception here.
# 1. some sfp module is inserted
# 2. sfp_event gets stuck and fails to fetch the change event instantaneously
# 3. and then the sfp module is removed
# 4. sfp_event starts to try fetching the change event
logger.log_notice("unknown module state, maybe the port suffers two adjacent insertion/removal")
continue

if sfp_status == sfp.SFP_STATUS_ERROR:
sfp_status, error_desc = s.get_error_info_from_sdk_error_type()
if error_desc:
error_dict[sfp_index + 1] = error_desc
port_dict[sfp_index + 1] = sfp_status

if port_dict:
logger.log_notice(f'Sending SFP change event: {port_dict}, error event: {error_dict}')
self.reinit_sfps(port_dict)
result_dict = {'sfp': port_dict}
result_dict['sfp_error'] = error_dict
return True, result_dict
return True, {
'sfp': port_dict,
'sfp_error': error_dict
}
else:
if not wait_for_ever:
if not wait_forever:
elapse = time.time() - begin
logger.log_info(f"get_change_event: wait_for_ever {wait_for_ever} elapse {elapse} iteartion {i}")
if elapse * 1000 >= timeout:
logger.log_info(f"elapse {elapse} > timeout {timeout} iteartion {i} returning empty dict")
return True, {'sfp': {}}
i += 1

def reinit_sfps(self, port_dict):
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2020-2023 NVIDIA CORPORATION & AFFILIATES.
# Copyright (c) 2020-2024 NVIDIA CORPORATION & AFFILIATES.
# Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -242,7 +242,7 @@ def get_cpld_component_list(cls):

@classmethod
@utils.read_only_cache()
def is_independent_mode(cls):
def is_module_host_management_mode(cls):
from sonic_py_common import device_info
_, hwsku_dir = device_info.get_paths_to_platform_and_hwsku_dirs()
sai_profile_file = os.path.join(hwsku_dir, 'sai.profile')
Expand All @@ -258,7 +258,7 @@ def wait_platform_ready(cls):
"""
conditions = []
sysfs_nodes = ['power_mode', 'power_mode_policy', 'present', 'reset', 'status', 'statuserror']
if cls.is_independent_mode():
if cls.is_module_host_management_mode():
sysfs_nodes.extend(['control', 'frequency', 'frequency_support', 'hw_present', 'hw_reset',
'power_good', 'power_limit', 'power_on', 'temperature/input'])
else:
Expand Down
Loading

0 comments on commit 4bf5548

Please sign in to comment.