From 85c34bf7cd8d76bf93c216828bbd77dcdce5b506 Mon Sep 17 00:00:00 2001 From: Riccardo Balbo Date: Mon, 29 Apr 2024 22:32:36 +0200 Subject: [PATCH] improve logging --- src/OpenAgentsNode.py | 86 ++++++++++++++++++++++++------------------- 1 file changed, 49 insertions(+), 37 deletions(-) diff --git a/src/OpenAgentsNode.py b/src/OpenAgentsNode.py index 30129af..646f7e2 100644 --- a/src/OpenAgentsNode.py +++ b/src/OpenAgentsNode.py @@ -1,5 +1,4 @@ import grpc -import logging from openagents_grpc_proto import rpc_pb2_grpc from openagents_grpc_proto import rpc_pb2 @@ -15,15 +14,8 @@ from threading import Condition import requests -def cnvLogLevel(logLevel): - if logLevel == "debug": return logging.DEBUG - if logLevel == "info": return logging.INFO - if logLevel == "warn": return logging.WARNING - if logLevel == "error": return logging.ERROR - if logLevel == "fine": return logging.DEBUG - if logLevel == "finer": return logging.DEBUG - if logLevel == "finest": return logging.DEBUG - return logging.DEBUG + + class BlobWriter : def __init__(self,writeQueue,res ): self.writeQueue = writeQueue @@ -166,7 +158,7 @@ def __init__(self, filters, meta, template, sockets): self.cachePath = None self.logger = None - self.logger = Logger("JobRunner", self) + self.logger = Logger("JobRunner", "0", self, False) self._filters = filters if not isinstance(meta, str): @@ -325,22 +317,29 @@ def __init__(self, nameOrMeta=None, icon=None, description=None): self.logger = None name = "" + icon = "" + description = "" + version = "0.0.1" if isinstance(nameOrMeta, str): name = nameOrMeta else : - name = nameOrMeta["name"] - icon = nameOrMeta["picture"] - description = nameOrMeta["about"] + name = nameOrMeta["name"] if "name" in nameOrMeta else None + icon = nameOrMeta["picture"] if "picture" in nameOrMeta else None + description = nameOrMeta["about"] if "about" in nameOrMeta else None + version = nameOrMeta["version"] if "version" in nameOrMeta else None + self.nodeName = name or os.getenv('NODE_NAME', "OpenAgentsNode") self.nodeIcon = icon or os.getenv('NODE_ICON', "") + self.nodeVersion = version or os.getenv('NODE_VERSION', "0.0.1") self.nodeDescription = description or os.getenv('NODE_DESCRIPTION', "") + self.channel = None self.rpcClient = None - self.logger = Logger(name) + self.logger = Logger(self.nodeName,self.nodeVersion) def registerRunner(self, runner): - runner.logger=self.logger + runner.logger=Logger(self.nodeName+"."+runner.__class__.__name__,self.nodeVersion,runner) self.runners.append(runner) def getLogger(self): @@ -370,7 +369,10 @@ def getClient(self): return self.rpcClient async def _logToJob(self, message, jobId=None): - await self.getClient().logForJob(rpc_pb2.RpcJobLog(jobId=jobId, log=message)) + try: + await self.getClient().logForJob(rpc_pb2.RpcJobLog(jobId=jobId, log=message)) + except Exception as e: + print("Error logging to job "+str(e)) def log(self,message, jobId=None): if jobId: @@ -400,17 +402,17 @@ async def executePendingJobForRunner(self , runner): ))).jobs) for job in jobs: - if len(jobs)>0 : self.log(str(len(jobs))+" pending jobs") - else : self.log("No pending jobs") + if len(jobs)>0 : runner.getLogger().log(str(len(jobs))+" pending jobs") + else : runner.getLogger().log("No pending jobs") wasAccepted=False t=time.time() try: client = self.getClient() # Reconnect client for each job if not await runner.canRun(job): continue - asyncio.create_task(self._acceptJob(job.id)) + await self._acceptJob(job.id) wasAccepted = True - self.log("Job started on node "+self.nodeName, job.id) + runner.getLogger().info("Job started on node "+self.nodeName) runner._setNode(self) runner._setJob(job) await runner.preRun() @@ -418,24 +420,24 @@ async def task(): try: output=await runner.run(job) await runner.postRun() - self.log("Job completed in "+str(time.time()-t)+" seconds on node "+self.nodeName, job.id) + runner.getLogger().info("Job completed in "+str(time.time()-t)+" seconds on node "+self.nodeName, job.id) await client.completeJob(rpc_pb2.RpcJobOutput(jobId=job.id, output=output)) except Exception as e: self.failedJobsTracker.append([job.id, time.time()]) - self.log("Job failed in "+str(time.time()-t)+" seconds on node "+self.nodeName+" with error "+str(e), job.id) + runner.getLogger().error("Job failed in "+str(time.time()-t)+" seconds on node "+self.nodeName+" with error "+str(e), job.id) if wasAccepted: await client.cancelJob(rpc_pb2.RpcCancelJob(jobId=job.id, reason=str(e))) traceback.print_exc() asyncio.create_task(task()) except Exception as e: self.failedJobsTracker.append([job.id, time.time()]) - self.log("Job failed in "+str(time.time()-t)+" seconds on node "+self.nodeName+" with error "+str(e), job.id) + runner.getLogger().error("Job failed in "+str(time.time()-t)+" seconds on node "+self.nodeName+" with error "+str(e), job.id) if wasAccepted: await client.cancelJob(rpc_pb2.RpcCancelJob(jobId=job.id, reason=str(e))) traceback.print_exc() except Exception as e: - self.log("Error executing runner "+str(e), None) traceback.print_exc() + runner.getLogger().error("Error executing runner "+str(e)) await asyncio.sleep(5000.0/1000.0) self.runnerTasks[runner]=asyncio.create_task(self.executePendingJobForRunner(runner)) @@ -447,7 +449,7 @@ async def executePendingJob(self ): if not runner in self.runnerTasks: self.runnerTasks[runner]=asyncio.create_task(self.executePendingJobForRunner(runner)) except Exception as e: - self.log("Error executing pending job "+str(e), None) + self.getLogger().log("Error executing pending job "+str(e), None) async def reannounce(self): @@ -463,9 +465,9 @@ async def reannounce(self): description = self.nodeDescription, )) self.nextNodeAnnounce = int(time.time()*1000) + res.refreshInterval - self.log("Node announced, next announcement in "+str(res.refreshInterval)+" ms") + self.getLogger().log("Node announced, next announcement in "+str(res.refreshInterval)+" ms") except Exception as e: - self.log("Error announcing node "+ str(e), None) + self.getLogger().error("Error announcing node "+ str(e), None) self.nextNodeAnnounce = int(time.time()*1000) + 5000 for runner in self.runners: @@ -478,12 +480,12 @@ async def reannounce(self): sockets=runner._sockets )) runner._nextAnnouncementTimestamp = int(time.time()*1000) + res.refreshInterval - self.log("Template announced, next announcement in "+str(res.refreshInterval)+" ms") + self.getLogger().log("Template announced, next announcement in "+str(res.refreshInterval)+" ms") except Exception as e: - self.log("Error announcing template "+ str(e), None) + self.getLogger().error("Error announcing template "+ str(e), None) runner._nextAnnouncementTimestamp = int(time.time()*1000) + 5000 except Exception as e: - self.log("Error reannouncing "+str(e), None) + self.getLogger().error("Error reannouncing "+str(e), None) await asyncio.sleep(5000.0/1000.0) asyncio.create_task(self.reannounce()) @@ -531,6 +533,10 @@ def log(self, level, message, timestamp=None): '_timestamp': timestamp or int(time.time()*1000), 'message': message } + meta=self.options["meta"] if "meta" in self.options else {} + for key in meta: + log_entry[key]=meta[key] + self.buffer.put(log_entry) if self.buffer.qsize() >= self.batchSize: with self.wait: @@ -571,18 +577,17 @@ def flushLoop(self): class Logger : - - - def __init__(self, name, runner=None): + def __init__(self, name, version, runner=None, enableOobs=True): self.name=name or "main" self.runner=runner self.logger=None self.logLevel=None self.oobsLogger=None + self.version=version self.logLevel = os.getenv('LOG_LEVEL', "debug") oobsEndPoint = os.getenv('OPENOBSERVE_ENDPOINT', None) self.oobsLogLevel= os.getenv('OPENOBSERVE_LOGLEVEL', self.logLevel) - if oobsEndPoint: + if enableOobs and oobsEndPoint: self.oobsLogger = OpenObserveLogger({ "baseUrl": oobsEndPoint, "org": os.getenv('OPENOBSERVE_ORG', "default"), @@ -593,6 +598,10 @@ def __init__(self, name, runner=None): }, "batchSize": int(os.getenv('OPENOBSERVE_BATCHSIZE', 21)), "flushInterval": int(os.getenv('OPENOBSERVE_FLUSHINTERVAL', 0)), + "meta":{ + "appName": self.name, + "appVersion": self.version + } }) @@ -611,10 +620,12 @@ def _log(self, level, message): levelV=self.levelToValue(level) if levelV >=self.levelToValue(self.logLevel): date = time.strftime("%Y-%m-%d %H:%M:%S") - print(date+" ["+self.name+"] : "+level+" : "+message) + print(date+" ["+self.name+":"+self.version+"] : "+level+" : "+message) if self.oobsLogger and self.levelToValue(self.oobsLogLevel) >= levelV: self.oobsLogger.log(level, message) - + + if self.runner and levelV >= self.levelToValue("info"): + self.runner.log(message) def log(self, *args): @@ -628,6 +639,7 @@ def warn(self, *args): def error(self, *args): self._log("error", " ".join([str(x) for x in args])) + traceback.print_exc() def debug(self, *args): self._log("debug", " ".join([str(x) for x in args]))