Skip to content

Commit

Permalink
[Mellanox] run module initialization when any SFP related API is called
Browse files Browse the repository at this point in the history
  • Loading branch information
Junchao-Mellanox committed Apr 10, 2024
1 parent bf592f6 commit c7d3ded
Show file tree
Hide file tree
Showing 6 changed files with 1,210 additions and 94 deletions.
244 changes: 213 additions & 31 deletions platform/mellanox/mlnx-platform-api/sonic_platform/chassis.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
import os
from functools import reduce
from .utils import extract_RJ45_ports_index
from . import module_host_mgmt_initializer
from . import utils
from .device_data import DeviceDataManager
import re
import queue
import select
import threading
import time
from sonic_platform import modules_mgmt
except ImportError as e:
raise ImportError (str(e) + "- required module not found")

Expand Down Expand Up @@ -132,9 +132,9 @@ def __init__(self):

Chassis.chassis_instance = self

self.modules_mgmt_thread = threading.Thread()
self.modules_changes_queue = queue.Queue()
self.modules_mgmt_task_stopping_event = threading.Event()
self.module_host_mgmt_initializer = module_host_mgmt_initializer.ModuleHostMgmtInitializer()
self.poll_obj = None
self.registered_fds = None

logger.log_info("Chassis loaded successfully")

Expand Down Expand Up @@ -338,8 +338,11 @@ def get_all_sfps(self):
Returns:
A list of objects derived from SfpBase representing all sfps
available on this chassis
"""
self.initialize_sfp()
"""
if DeviceDataManager.is_independent_mode():
self.module_host_mgmt_initializer.initialize(self)
else:
self.initialize_sfp()
return self._sfp_list

def get_sfp(self, index):
Expand All @@ -356,7 +359,10 @@ def get_sfp(self, index):
An object dervied from SfpBase representing the specified sfp
"""
index = index - 1
self.initialize_single_sfp(index)
if DeviceDataManager.is_independent_mode():
self.module_host_mgmt_initializer.initialize(self)
else:
self.initialize_single_sfp(index)
return super(Chassis, self).get_sfp(index)

def get_port_or_cage_type(self, index):
Expand Down Expand Up @@ -406,42 +412,218 @@ def get_change_event(self, timeout=0):
indicates that fan 0 has been removed, fan 2
has been inserted and sfp 11 has been removed.
"""
if not self.modules_mgmt_thread.is_alive():
# open new SFP change events thread
self.modules_mgmt_thread = modules_mgmt.ModulesMgmtTask(q=self.modules_changes_queue
, main_thread_stop_event = self.modules_mgmt_task_stopping_event)
# Set the thread as daemon so when pmon/xcvrd are shutting down, modules_mgmt will shut down immedietly.
self.modules_mgmt_thread.daemon = True
self.modules_mgmt_thread.start()
self.initialize_sfp()
wait_for_ever = (timeout == 0)
if DeviceDataManager.is_independent_mode():
self.module_host_mgmt_initializer.initialize(self)
return self.get_change_event_for_module_host_management_mode(timeout)
else:
self.initialize_sfp()
return self.get_change_event_legacy(timeout)

def get_change_event_for_module_host_management_mode(self, timeout):
"""Get SFP change event when module host management mode is enabled.
Args:
timeout: Timeout in milliseconds (optional). If timeout == 0,
this method will block until a change is detected.
Returns:
(bool, dict):
- True if call successful, False if not; - Deprecated, will always return True
- A nested dictionary where key is a device type,
value is a dictionary with key:value pairs in the format of
{'device_id':'device_event'},
where device_id is the device ID for this device and
device_event,
status='1' represents device inserted,
status='0' represents device removed.
Ex. {'fan':{'0':'0', '2':'1'}, 'sfp':{'11':'0'}}
indicates that fan 0 has been removed, fan 2
has been inserted and sfp 11 has been removed.
"""
if not self.poll_obj:
# SDK always sent event for the first time polling. Such event should not be sent to xcvrd.
# Store SFP state before first time polling so that we can detect dummy event.
self.sfp_states_before_first_poll = {}
self.poll_obj = select.poll()
self.registered_fds = {}
for s in self.get_all_sfps():
fds = s.get_fds_for_poling()
for fd_type, fd in fds.items():
self.poll_obj.register(fd, select.POLLERR | select.POLLPRI)
self.registered_fds[fd.fileno()] = (s.sdk_index, fd, fd_type)
self.sfp_states_before_first_poll[s.sdk_index] = s.state

logger.log_debug(f'Registered SFP file descriptors for polling: {self.registered_fds}')

from . import sfp

wait_forever = (timeout == 0)
# poll timeout should be no more than 1000ms to ensure fast shutdown flow
timeout = 1000.0 if timeout >= 1000 else float(timeout)
port_dict = {}
error_dict = {}
begin = time.time()
wait_ready_task = sfp.SFP.get_wait_ready_task()

while True:
fds_events = self.poll_obj.poll(timeout)
for fileno, _ in fds_events:
if fileno not in self.registered_fds:
logger.log_error(f'Unknown file no {fileno} from poll event, registered files are {self.registered_fds}')
continue

sfp_index, fd, fd_type = self.registered_fds[fileno]
s = self._sfp_list[sfp_index]
fd_value = int(fd.read().strip())

if sfp_index in self.sfp_states_before_first_poll:
# Detecting dummy event
sfp_state_before_first_poll = self.sfp_states_before_first_poll[sfp_index]
self.sfp_states_before_first_poll.pop(sfp_index)
if s.is_dummy_event(sfp_state_before_first_poll, fd_type, fd_value):
# Ignore dummy event for the first poll, assume SDK only provide 1 dummy event
logger.log_debug(f'Ignore dummy event {fd_type}:{fd_value} for SFP {sfp_index}')
continue

logger.log_notice(f'Got SFP event: index={sfp_index}, type={fd_type}, value={fd_value}')
if fd_type == 'hw_present':
# event could be EVENT_NOT_PRESENT or EVENT_PRESENT
event = sfp.EVENT_NOT_PRESENT if fd_value == 0 else sfp.EVENT_PRESENT
s.on_event(event)
elif fd_type == 'present':
if str(fd_value) == sfp.SFP_STATUS_ERROR:
# FW control cable got an error, no need trigger state machine
sfp_status, error_desc = s.get_error_info_from_sdk_error_type()
port_dict[sfp_index + 1] = sfp_status
if error_desc:
error_dict[sfp_index + 1] = error_desc
continue
elif str(fd_value) == sfp.SFP_STATUS_INSERTED:
# FW control cable got present, only case is that the cable is recovering
# from an error. FW control cable has no transition from "Not Present" to "Present"
# because "Not Present" cable is always "software control" and should always poll
# hw_present sysfs instead of present sysfs.
port_dict[sfp_index + 1] = sfp.SFP_STATUS_INSERTED
continue
else:
s.on_event(sfp.EVENT_NOT_PRESENT)
else:
# event could be EVENT_POWER_GOOD or EVENT_POWER_BAD
event = sfp.EVENT_POWER_BAD if fd_value == 0 else sfp.EVENT_POWER_GOOD
s.on_event(event)

if s.in_stable_state():
s.fill_change_event(port_dict)
s.refresh_poll_obj(self.poll_obj, self.registered_fds)
else:
logger.log_debug(f'SFP {sfp_index} does not reach stable state, state={s.state}')

ready_sfp_set = wait_ready_task.get_ready_set()
for sfp_index in ready_sfp_set:
s = self._sfp_list[sfp_index]
s.on_event(sfp.EVENT_RESET_DONE)
if s.in_stable_state():
s.fill_change_event(port_dict)
s.refresh_poll_obj(self.poll_obj, self.registered_fds)
else:
logger.log_error(f'SFP {sfp_index} failed to reach stable state, state={s.state}')

if port_dict:
logger.log_notice(f'Sending SFP change event: {port_dict}, error event: {error_dict}')
self.reinit_sfps(port_dict)
return True, {
'sfp': port_dict,
'sfp_error': error_dict
}
else:
if not wait_forever:
elapse = time.time() - begin
if elapse * 1000 >= timeout:
return True, {'sfp': {}}

def get_change_event_legacy(self, timeout):
"""Get SFP change event when module host management is disabled.
Args:
timeout (int): polling timeout in ms
Returns:
(bool, dict):
- True if call successful, False if not; - Deprecated, will always return True
- A nested dictionary where key is a device type,
value is a dictionary with key:value pairs in the format of
{'device_id':'device_event'},
where device_id is the device ID for this device and
device_event,
status='1' represents device inserted,
status='0' represents device removed.
Ex. {'fan':{'0':'0', '2':'1'}, 'sfp':{'11':'0'}}
indicates that fan 0 has been removed, fan 2
has been inserted and sfp 11 has been removed.
"""
if not self.poll_obj:
self.poll_obj = select.poll()
self.registered_fds = {}
# SDK always sent event for the first time polling. Such event should not be sent to xcvrd.
# Store SFP state before first time polling so that we can detect dummy event.
self.sfp_states_before_first_poll = {}
for s in self.get_all_sfps():
fd = s.get_fd_for_polling_legacy()
self.poll_obj.register(fd, select.POLLERR | select.POLLPRI)
self.registered_fds[fd.fileno()] = (s.sdk_index, fd)
self.sfp_states_before_first_poll[s.sdk_index] = s.get_present_value()

logger.log_debug(f'Registered SFP file descriptors for polling: {self.registered_fds}')

from . import sfp

wait_forever = (timeout == 0)
# poll timeout should be no more than 1000ms to ensure fast shutdown flow
timeout = 1000.0 if timeout >= 1000 else float(timeout)
port_dict = {}
error_dict = {}
begin = time.time()
i = 0

while True:
try:
logger.log_info(f'get_change_event() trying to get changes from queue on iteration {i}')
port_dict = self.modules_changes_queue.get(timeout=timeout / 1000)
logger.log_info(f'get_change_event() iteration {i} port_dict: {port_dict}')
except queue.Empty:
logger.log_info(f"failed to get item from modules changes queue on itertaion {i}")
fds_events = self.poll_obj.poll(timeout)
for fileno, _ in fds_events:
if fileno not in self.registered_fds:
logger.log_error(f'Unknown file no {fileno} from poll event, registered files are {self.registered_fds}')
continue

sfp_index, fd = self.registered_fds[fileno]
fd.seek(0)
sfp_status = fd.read().strip()

if sfp_index in self.sfp_states_before_first_poll:
# Detecting dummy event
sfp_state_before_poll = self.sfp_states_before_first_poll[sfp_index]
self.sfp_states_before_first_poll.pop(sfp_index)
if sfp_state_before_poll == sfp_status:
# Ignore dummy event for the first poll, assume SDK only provide 1 dummy event
logger.log_debug(f'Ignore dummy event {sfp_status} for SFP {sfp_index}')
continue

logger.log_notice(f'Got SFP event: index={sfp_index}, value={sfp_status}')
if sfp_status == sfp.SFP_STATUS_ERROR:
s = self._sfp_list[sfp_index]
sfp_status, error_desc = s.get_error_info_from_sdk_error_type()
if error_desc:
error_dict[sfp_index + 1] = error_desc
port_dict[sfp_index + 1] = sfp_status

if port_dict:
logger.log_notice(f'Sending SFP change event: {port_dict}, error event: {error_dict}')
self.reinit_sfps(port_dict)
result_dict = {'sfp': port_dict}
result_dict['sfp_error'] = error_dict
return True, result_dict
return True, {
'sfp': port_dict,
'sfp_error': error_dict
}
else:
if not wait_for_ever:
if not wait_forever:
elapse = time.time() - begin
logger.log_info(f"get_change_event: wait_for_ever {wait_for_ever} elapse {elapse} iteartion {i}")
if elapse * 1000 >= timeout:
logger.log_info(f"elapse {elapse} > timeout {timeout} iteartion {i} returning empty dict")
return True, {'sfp': {}}
i += 1

def reinit_sfps(self, port_dict):
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from .device_data import DeviceDataManager
from . import utils
from sonic_py_common.logger import Logger

import atexit
import os
import sys
import threading

MODULE_READY_MAX_WAIT_TIME = 300
MODULE_READY_CHECK_INTERVAL = 5
MODULE_READY_CONTAINER_FILE = '/tmp/module_host_mgmt_ready'
MODULE_READY_HOST_FILE = '/tmp/nv-syncd-shared/module_host_mgmt_ready'
DEDICATE_INIT_DAEMON = 'xcvrd'
initialization_owner = False

logger = Logger()


class ModuleHostMgmtInitializer:
def __init__(self):
self.initialized = False
self.lock = threading.Lock()

def initialize(self, chassis):
global initialization_owner
if self.initialized:
return

if utils.is_host():
self.wait_module_ready()
else:
cmd = os.path.basename(sys.argv[0])
if DEDICATE_INIT_DAEMON in cmd:
if not self.initialized:
with self.lock:
if not self.initialized:
logger.log_notice('Starting module initialization for module host management...')
initialization_owner = True
self.remove_module_ready_file()

chassis.initialize_sfp()

from .sfp import SFP
SFP.initialize_sfp_modules(chassis._sfp_list)

self.create_module_ready_file()
self.initialized = True
logger.log_notice('Module initialization for module host management done')
else:
self.wait_module_ready()

@classmethod
def create_module_ready_file(cls):
with open(MODULE_READY_CONTAINER_FILE, 'w'):
pass

@classmethod
def remove_module_ready_file(cls):
if os.path.exists(MODULE_READY_CONTAINER_FILE):
os.remove(MODULE_READY_CONTAINER_FILE)

def wait_module_ready(self):
if utils.is_host():
module_ready_file = MODULE_READY_HOST_FILE
else:
module_ready_file = MODULE_READY_CONTAINER_FILE

if os.path.exists(module_ready_file):
self.initialized = True
return
else:
print('Waiting module to be initialized...')

if utils.wait_until(os.path.exists, MODULE_READY_MAX_WAIT_TIME, MODULE_READY_CHECK_INTERVAL, module_ready_file):
self.initialized = True
else:
logger.log_error('Module initialization timeout', True)


@atexit.register
def clean_up():
if initialization_owner:
ModuleHostMgmtInitializer.remove_module_ready_file()
Loading

0 comments on commit c7d3ded

Please sign in to comment.