Skip to content

Commit

Permalink
Merge pull request #379 from DUNE-DAQ/thea/k8s_consolidation
Browse files Browse the repository at this point in the history
K8S readout consolidation
  • Loading branch information
plasorak authored Aug 24, 2023
2 parents a0fe335 + 7074e42 commit 30fd67e
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 124 deletions.
175 changes: 51 additions & 124 deletions python/daqconf/apps/readout_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from pathlib import Path

from ..core.conf_utils import Direction, Queue
from ..core.sourceid import SourceIDBroker
from ..core.daqmodule import DAQModule
from ..core.app import App, ModuleGraph
from ..detreadoutmap import ReadoutUnitDescriptor, group_by_key
Expand Down Expand Up @@ -72,20 +71,7 @@ def compute_data_types(
queue_frag_type = "TDEFrame"
fakedata_time_tick=4472*32
fakedata_frame_size=8972
# 20-Jun-2023, KAB: quick fix to get FD-specific nightly build to run
## Near detector types
#elif det_str == "NDLAr_TPC":
# fe_type = "pacman"
# fakedata_frag_type = "PACMAN"
# queue_frag_type = "PACMANFrame"
# fakedata_time_tick=None
# fakedata_frame_size=None
#elif det_str == "NDLAr_PDS":
# fe_type = "mpd"
# fakedata_frag_type = "MPD"
# queue_frag_type = "MPDFrame"
# fakedata_time_tick=None
# fakedata_frame_size=None

else:
raise ValueError(f"No match for {det_str}, {stream_entry.kind}")

Expand Down Expand Up @@ -131,10 +117,6 @@ def streams_by_rxiface_and_tx_endpoint(self):
m[k] = group_by_key(v, lambda s: (s.parameters.tx_ip, s.parameters.tx_mac, s.parameters.tx_host))

return m

# def streams_by_ru(self):
# m = group_by_key(self.desc.streams, lambda s: (getattr(s.parameters, self.desc._host_label_map[s.kind]), getattr(s.parameters, self.desc._iflabel_map[s.kind]), s.kind, s.geo_id.det_id))
# return m

def build_conf(self, eal_arg_list, lcores_id_set):

Expand Down Expand Up @@ -280,15 +262,7 @@ def create_fake_cardreader(
modules = [DAQModule(name = "fake_source",
plugin = "FDFakeCardReader",
conf = conf)]
# queues = [
# Queue(
# f"fake_source.output_{s.src_id}",
# f"datahandler_{s.src_id}.raw_input",
# QUEUE_FRAGMENT_TYPE,
# f'{FRONTEND_TYPE}_link_{s.src_id}', 100000
# ) for s in RU_DESCRIPTOR.streams
# ]


queues = []
for s in RU_DESCRIPTOR.streams:
FRONTEND_TYPE, QUEUE_FRAGMENT_TYPE, _, _, _ = compute_data_types(s)
Expand Down Expand Up @@ -367,26 +341,6 @@ def create_felix_cardreader(
)
)]

# # Queues for card reader 1
# queues += [
# Queue(
# f'flxcard_0.output_{idx}',
# f"datahandler_{idx}.raw_input",
# QUEUE_FRAGMENT_TYPE,
# f'{FRONTEND_TYPE}_link_{idx}',
# 100000
# ) for idx in strms_slr0
# ]
# # Queues for card reader 2
# queues += [
# Queue(
# f'flxcard_1.output_{idx}',
# f"datahandler_{idx}.raw_input",
# QUEUE_FRAGMENT_TYPE,
# f'{FRONTEND_TYPE}_link_{idx}',
# 100000
# ) for idx in strms_slr1
# ]

# Queues for card reader 1
for s in strms_slr0:
Expand Down Expand Up @@ -445,16 +399,6 @@ def create_dpdk_cardreader(
lcores_id_set=lcores_id_set
),
)]

# Queues
# queues = [
# Queue(
# f"{nic_reader_name}.output_{stream.src_id}",
# f"datahandler_{stream.src_id}.raw_input", QUEUE_FRAGMENT_TYPE,
# f'{FRONTEND_TYPE}_stream_{stream.src_id}', 100000
# )
# for stream in RU_DESCRIPTOR.streams
# ]

queues = []
for stream in RU_DESCRIPTOR.streams:
Expand All @@ -471,66 +415,17 @@ def create_dpdk_cardreader(
return modules, queues


# def create_pacman_cardreader(
# self,
# FRONTEND_TYPE: str,
# QUEUE_FRAGMENT_TYPE: str,
# RU_DESCRIPTOR # ReadoutUnitDescriptor
# ) -> tuple[list, list]:
# """
# Create a Pacman Cardeader
# """
#
# reader_name = "nd_reader"
# if FRONTEND_TYPE == 'pacman':
# reader_name = "pacman_source"
#
# elif FRONTEND_TYPE == 'mpd':
# reader_name = "mpd_source"
#
# else:
# raise RuntimeError(f"Pacman Cardreader for {FRONTEND_TYPE} not supported")
#
# modules = [DAQModule(
# name=reader_name,
# plugin="PacmanCardReader",
# conf=pcr.Conf(link_confs = [pcr.LinkConfiguration(Source_ID=stream.src_id)
# for stream in RU_DESCRIPTOR.streams],
# zmq_receiver_timeout = 10000)
# )]
#
# # Queues
# queues = [
# Queue(
# f"{reader_name}.output_{stream.src_id}",
# f"datahandler_{stream.src_id}.raw_input", QUEUE_FRAGMENT_TYPE,
# f'{FRONTEND_TYPE}_stream_{stream.src_id}', 100000
# )
# for stream in RU_DESCRIPTOR.streams
# ]
#
return modules, queues





###
# Create detector datalink handlers
###
def create_det_dhl(
self,
# LATENCY_BUFFER_SIZE: int,
LATENCY_BUFFER_NUMA_AWARE: int,
LATENCY_BUFFER_ALLOCATION_MODE: int,
NUMA_ID: int,
SEND_PARTIAL_FRAGMENTS: bool,
# RAW_RECORDING_OUTPUT_DIR: str,
DATA_REQUEST_TIMEOUT: int,
# FRAGMENT_SEND_TIMEOUT: int,
# RAW_RECORDING_ENABLED: bool,
RU_DESCRIPTOR, # ReadoutUnitDescriptor
# EMULATOR_MODE : bool

) -> tuple[list, list]:

Expand Down Expand Up @@ -806,7 +701,6 @@ def generate(
DATA_FILES = data_file_map
DATA_REQUEST_TIMEOUT=data_timeout_requests

# FRONTEND_TYPE, QUEUE_FRAGMENT_TYPE, _, _, _ = compute_data_types(RU_DESCRIPTOR.det_id, self.det_cfg.clock_speed_hz, RU_DESCRIPTOR.kind)

# TPG is automatically disabled for non wib2 frontends
# TPG_ENABLED = TPG_ENABLED and (FRONTEND_TYPE=='wib2' or FRONTEND_TYPE=='wibeth')
Expand Down Expand Up @@ -852,15 +746,6 @@ def generate(
cr_mods += dpdk_mods
cr_queues += dpdk_queues

# elif RU_DESCRIPTOR.kind == 'eth' and RU_DESCRIPTOR.streams[0].parameters.protocol == "zmq":
#
# pac_mods, pac_queues = self.create_pacman_cardreader(
# FRONTEND_TYPE=FRONTEND_TYPE,
# QUEUE_FRAGMENT_TYPE=QUEUE_FRAGMENT_TYPE,
# RU_DESCRIPTOR=RU_DESCRIPTOR
# )
# cr_mods += pac_mods
# cr_queues += pac_queues

modules += cr_mods
queues += cr_queues
Expand Down Expand Up @@ -923,13 +808,55 @@ def generate(
readout_app = App(mgraph, host=RU_DESCRIPTOR.host_name)


# Kubernetes-specific extensions
if RU_DESCRIPTOR.kind == 'flx':
c = card_override if card_override != -1 else RU_DESCRIPTOR.iface
readout_app.resources = {
f"felix.cern/flx{c}-data": "1", # requesting FLX{c}
# "memory": f"{}Gi" # yes bro
}
if cfg.use_fake_cards:
pass
else:
# Kubernetes-specific extensions
# FELIX
if RU_DESCRIPTOR.kind == 'flx':

c = card_override if card_override != -1 else RU_DESCRIPTOR.iface
readout_app.resources = {
f"felix.cern/flx{c}-data": "1", # requesting FLX{c}
# "memory": f"{}Gi" # yes bro
}
readout_app.pod_privileged = True

readout_app.mounted_dirs += [{
'name': 'devfs',
'physical_location': '/dev',
'in_pod_location': '/dev',
'read_only': False,
}]

# DPDK
elif RU_DESCRIPTOR.kind == 'eth':

readout_app.resources = {
f"intel.com/intel_sriov_dpdk": "1", # requesting sriov
}

readout_app.mounted_dirs += [
{
'name': 'devfs',
'physical_location': '/dev',
'in_pod_location': '/dev',
'read_only': False,
},
{
'name': 'linux-firmware',
'physical_location': '/lib/firmware',
'in_pod_location': '/lib/firmware',
'read_only': True,
}
]

# Remove in favour of capabilites
readout_app.pod_privileged = True
readout_app.pod_capabilities += [
"IPC_LOCK",
"CAP_NET_ADMIN"
]

dir_names = set()

Expand Down
2 changes: 2 additions & 0 deletions python/daqconf/core/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ def __init__(self, modulegraph=None, host="localhost", name="__app"):
self.resources = {}
self.pod_affinity = []
self.pod_anti_affinity = []
self.pod_privileged = False
self.pod_capabilities = []

def reset_graph(self):
if self.modulegraph:
Expand Down
2 changes: 2 additions & 0 deletions python/daqconf/core/conf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,8 @@ def add_k8s_app_boot_data(
"resources": app.resources,
"affinity": app.pod_affinity,
"anti-affinity": app.pod_anti_affinity,
"privileged": app.pod_privileged,
"capabilities": app.pod_capabilities,
}


Expand Down

0 comments on commit 30fd67e

Please sign in to comment.