Skip to content

Commit

Permalink
Merge pull request DIRACGrid#6047 from rupozzi/hackfix
Browse files Browse the repository at this point in the history
[integration] Hackaton fixes v8.0.0a20 of Monitoring issues
  • Loading branch information
fstagni authored May 12, 2022
2 parents 9fbc009 + 1988989 commit e10cf8e
Show file tree
Hide file tree
Showing 18 changed files with 73 additions and 95 deletions.
8 changes: 4 additions & 4 deletions dirac.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -859,11 +859,11 @@ Operations
# For more info https://dirac.readthedocs.io/en/integration/AdministratorGuide/Systems/MonitoringSystem/index.html
Default = Accounting
# WMSHistory = Monitoring
# PilotSubmission = Accounting
# DataOperation = Accounting, Monitoring
# Agent = ...
# Service = ...
# RMS = ...
# PilotSubmissionMonitoring = Accounting
# AgentMonitoring = ...
# ServiceMonitoring = ...
# RMSMonitoring = ...
}
# This is the default section of operations.
# Any value here can be overwriten in the setup specific section
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ This can be done either via the CS or directly in the web app in the Configurati
MonitoringBackends
{
# WMSHistory = Monitoring
# PilotSubmission = Accounting
# DataOperation = Accounting, Monitoring
# PilotsHistory = ...
# Agent = ...
# Service = ...
# RMS = ...
# PilotSubmissionMonitoring = Accounting
# AgentMonitoring = ...
# ServiceMonitoring = ...
# RMSMonitoring = ...
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/DIRAC/Core/Base/AgentModule.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def __init__(self, agentName, loadName, baseAgentName=False, properties={}):

self.activityMonitoring = False
# Check if monitoring is enabled
if "Monitoring" in Operations().getMonitoringBackends(monitoringType="Agent"):
if "Monitoring" in Operations().getMonitoringBackends(monitoringType="AgentMonitoring"):
self.activityMonitoring = True

def __getCodeInfo(self):
Expand Down Expand Up @@ -371,7 +371,7 @@ def am_go(self):
self.activityMonitoringReporter.addRecord(
{
"AgentName": self.agentName,
"Timestamp": int(Time.toEpoch()),
"timestamp": int(Time.toEpoch()),
"Host": Network.getFQDN(),
"MemoryUsage": mem,
"CpuPercentage": cpuPercentage,
Expand Down
12 changes: 2 additions & 10 deletions src/DIRAC/Core/DISET/private/Service.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def __init__(self, serviceData):
self.__maxFD = 0
self.activityMonitoring = False
# Check if monitoring is enabled
if "Monitoring" in Operations().getMonitoringBackends(monitoringType="Service"):
if "Monitoring" in Operations().getMonitoringBackends(monitoringType="ServiceMonitoring"):
self.activityMonitoring = True

def setCloneProcessId(self, cloneId):
Expand Down Expand Up @@ -561,21 +561,17 @@ def _mbConnect(self, trid, handlerObj=None):

def _executeAction(self, trid, proposalTuple, handlerObj):
try:
initialWallTime, initialCPUTime, mem = self.__startReportToMonitoring()
response = handlerObj._rh_executeAction(proposalTuple)
if not response["OK"]:
return response
if self.activityMonitoring:
percentage = self.__endReportToMonitoring(initialWallTime, initialCPUTime)
self.activityMonitoringReporter.addRecord(
{
"timestamp": int(Time.toEpoch()),
"Host": Network.getFQDN(),
"serviceName": "_".join(self._name.split("/")),
"ServiceName": "_".join(self._name.split("/")),
"Location": self._cfg.getURL(),
"ResponseTime": response["Value"][1],
"MemoryUsage": mem,
"CpuPercentage": percentage,
}
)
return response["Value"][0]
Expand All @@ -584,7 +580,6 @@ def _executeAction(self, trid, proposalTuple, handlerObj):
return S_ERROR("Server error while executing action: %s" % str(e))

def _mbReceivedMsg(self, trid, msgObj):
initialWallTime, initialCPUTime, mem = self.__startReportToMonitoring()
result = self._authorizeProposal(
("Message", msgObj.getName()), trid, self._transportPool.get(trid).getConnectingCredentials()
)
Expand All @@ -596,16 +591,13 @@ def _mbReceivedMsg(self, trid, msgObj):
handlerObj = result["Value"]
response = handlerObj._rh_executeMessageCallback(msgObj)
if self.activityMonitoring and response["OK"]:
percentage = self.__endReportToMonitoring(initialWallTime, initialCPUTime)
self.activityMonitoringReporter.addRecord(
{
"timestamp": int(Time.toEpoch()),
"Host": Network.getFQDN(),
"ServiceName": "_".join(self._name.split("/")),
"Location": self._cfg.getURL(),
"ResponseTime": response["Value"][1],
"MemoryUsage": mem,
"CpuPercentage": percentage,
}
)
if response["OK"]:
Expand Down
2 changes: 1 addition & 1 deletion src/DIRAC/Core/Tornado/Client/TornadoClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def receiveFile(self, destFile, *args):
In practice, it calls the remote method `streamToClient` and stores the raw result in a file
:param str destFile: path where to store the result
:param destFile: file (or path) where to store the result
:param args: list of arguments
:returns: S_OK/S_ERROR
"""
Expand Down
13 changes: 9 additions & 4 deletions src/DIRAC/Core/Tornado/Client/private/TornadoBaseClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"""

# pylint: disable=broad-except

import io
from io import open
import errno
import os
Expand Down Expand Up @@ -480,7 +480,7 @@ def _request(self, retry=0, outputFile=None, **kwargs):
Sends the request to server
:param retry: internal parameters for recursive call. TODO: remove ?
:param outputFile: (default None) path to a file where to store the received data.
:param outputFile: (default None) can be the path to a file, or the file itself where to store the received data.
If set, the server response will be streamed for optimization
purposes, and the response data will not go through the
JDecode process
Expand Down Expand Up @@ -599,10 +599,15 @@ def _request(self, retry=0, outputFile=None, **kwargs):
rawText = r.text
r.raise_for_status()

with open(outputFile, "wb") as f:
if isinstance(outputFile, io.IOBase):
for chunk in r.iter_content(4096):
# if chunk: # filter out keep-alive new chuncks
f.write(chunk)
outputFile.write(chunk)
else:
with open(outputFile, "wb") as f:
for chunk in r.iter_content(4096):
# if chunk: # filter out keep-alive new chuncks
f.write(chunk)

return S_OK()

Expand Down
2 changes: 1 addition & 1 deletion src/DIRAC/Core/Tornado/Server/TornadoServer.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def __init__(self, services=True, endpoints=False, port=None):
self.__monitoringLoopDelay = 60 # In secs

self.activityMonitoring = False
if "Monitoring" in Operations().getMonitoringBackends(monitoringType="Service"):
if "Monitoring" in Operations().getMonitoringBackends(monitoringType="ServiceMonitoring"):
self.activityMonitoring = True
# If services are defined, load only these ones (useful for debug purpose or specific services)
retVal = self.handlerManager.loadServicesHandlers()
Expand Down
1 change: 1 addition & 0 deletions src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,7 @@ def __sendAccounting(self, ftsJob):

self.dataOpSender.sendData(
ftsJob.accountingDict,
commitFlag=True,
delayedCommit=True,
startTime=fromString(ftsJob.submitTime),
endTime=fromString(ftsJob.lastUpdate),
Expand Down
53 changes: 33 additions & 20 deletions src/DIRAC/MonitoringSystem/Client/DataOperationSender.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,36 @@ class DataOperationSender:
def __init__(self):
monitoringType = "DataOperation"
# Will use the `MonitoringBackends/Default` value as monitoring backend unless a flag for `MonitoringBackends/DataOperation` is set.
self.monitoringOption = Operations().getMonitoringBackends(monitoringType)
if "Monitoring" in self.monitoringOption:
self.monitoringOptions = Operations().getMonitoringBackends(monitoringType)
if "Monitoring" in self.monitoringOptions:
self.dataOperationReporter = MonitoringReporter(monitoringType)
if "Accounting" in self.monitoringOption:
if "Accounting" in self.monitoringOptions:
self.dataOp = DataOperation()

def sendData(self, baseDict, commitFlag=False, delayedCommit=False, startTime=False, endTime=False):
"""
Sends the input to Monitoring or Acconting based on the monitoringOption
Sends the input to Monitoring or Acconting based on the monitoringOptions
:param dict baseDict: contains a key/value pair
:param bool commitFlag: decides whether to commit the record or not.
:param bool delayedCommit: decides whether to commit the record with delay (only for sending to Accounting)
:param int startTime: epoch time, start time of the plot
:param int endTime: epoch time, end time of the plot
"""
if "Monitoring" in self.monitoringOption:

def sendMonitoring(self):
baseDict["ExecutionSite"] = DIRAC.siteName()
baseDict["Channel"] = baseDict["Source"] + "->" + baseDict["Destination"]
self.dataOperationReporter.addRecord(baseDict)
if commitFlag or delayedCommit:
if commitFlag:
result = self.dataOperationReporter.commit()
sLog.debug("Committing data operation to monitoring")
if not result["OK"]:
sLog.error("Could not commit data operation to monitoring", result["Message"])
else:
sLog.debug("Done committing to monitoring")

if "Accounting" in self.monitoringOption:
def sendAccounting(self):
self.dataOp.setValuesFromDict(baseDict)
if startTime:
self.dataOp.setStartTime(startTime)
Expand Down Expand Up @@ -81,25 +82,37 @@ def sendData(self, baseDict, commitFlag=False, delayedCommit=False, startTime=Fa
sLog.error("Could not delay-commit data operation to accounting")
return result

# Send data and commit prioritizing the first monitoring option in the list
for backend in self.monitoringOptions:
func = locals()[f"send{backend}"]
res = func()
if not res["OK"]:
return res

return S_OK()

# Call this method in order to commit all records added but not yet committed to Accounting and Monitoring
def concludeSending(self):
def monitoringCommitting():
result = self.dataOperationReporter.commit()
sLog.debug("Committing data operation to monitoring")
if not result["OK"]:
sLog.error("Could not commit data operation to monitoring", result["Message"])
sLog.debug("Done committing to monitoring")

def accountingCommitting():
def commitAccounting():
result = gDataStoreClient.commit()
sLog.debug("Concluding the sending and committing data operation to accounting")
if not result["OK"]:
sLog.error("Could not commit data operation to accounting", result["Message"])
sLog.debug("Done committing to accounting")
sLog.debug("Committing to accounting concluded")
return result

if "Monitoring" in self.monitoringOption:
monitoringCommitting()
if "Accounting" in self.monitoringOption:
accountingCommitting()
def commitMonitoring():
result = self.dataOperationReporter.commit()
sLog.debug("Committing data operation to monitoring")
if not result["OK"]:
sLog.error("Could not commit data operation to monitoring", result["Message"])
sLog.debug("Committing to monitoring concluded")
return result

# Commit data prioritizing first monitoring option in the list
for backend in self.monitoringOptions:
func = locals()[f"commit{backend}"]
res = func()
if not res["OK"]:
return res
return S_OK()
2 changes: 0 additions & 2 deletions src/DIRAC/MonitoringSystem/Client/Types/AgentMonitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def __init__(self):
]

self.monitoringFields = [
"RunningTime",
"MemoryUsage",
"CpuPercentage",
"CycleDuration",
Expand All @@ -35,7 +34,6 @@ def __init__(self):
"AgentName": {"type": "keyword"},
"Status": {"type": "keyword"},
"Location": {"type": "keyword"},
"RunningTime": {"type": "long"},
"MemoryUsage": {"type": "long"},
"CpuPercentage": {"type": "long"},
"CycleDuration": {"type": "long"},
Expand Down
2 changes: 0 additions & 2 deletions src/DIRAC/MonitoringSystem/Client/Types/ServiceMonitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def __init__(self):
]

self.monitoringFields = [
"RunningTime",
"MemoryUsage",
"CpuPercentage",
"Connections",
Expand All @@ -41,7 +40,6 @@ def __init__(self):
"ServiceName": {"type": "keyword"},
"Status": {"type": "keyword"},
"Location": {"type": "keyword"},
"RunningTime": {"type": "long"},
"MemoryUsage": {"type": "long"},
"CpuPercentage": {"type": "long"},
"Connections": {"type": "long"},
Expand Down
5 changes: 0 additions & 5 deletions src/DIRAC/MonitoringSystem/DB/MonitoringDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@
MonitoringTypes
{
ComponentMonitoring
{
# Indexing strategy. Possible values: day, week, month, year, null
Period = month
}
RMSMonitoring
{
# Indexing strategy. Possible values: day, week, month, year, null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ def export_streamToClient(self, fileId):
self.log.exception("Exception while generating plot", str(e))
result = S_ERROR("Error while generating plot: %s" % str(e))
if not result["OK"]:
return S_OK(b64encode(generateErrorMessagePlot(result["Message"])).decode())
return generateErrorMessagePlot(result["Message"])
fileId = result["Value"]

retVal = gDataCache.getPlotData(fileId)
if not retVal["OK"]:
return S_OK(b64encode(generateErrorMessagePlot(result["Message"])).decode())
return S_OK(b64encode(retVal["Value"]).decode())
return generateErrorMessagePlot(result["Message"])
return retVal["Value"]
Loading

0 comments on commit e10cf8e

Please sign in to comment.