From 5d5489f90341bf72c9a8163a72252aecb6dc1c72 Mon Sep 17 00:00:00 2001 From: AltmanD Date: Mon, 16 Oct 2023 12:29:42 +0800 Subject: [PATCH] feature(luyd): add collector logging in new pipeline (#735) * Add collector logging in new pipeline * Reformat * Reformat * Fix according to comment --- .../middleware/functional/collector.py | 101 ++++++++++++++++-- 1 file changed, 90 insertions(+), 11 deletions(-) diff --git a/ding/framework/middleware/functional/collector.py b/ding/framework/middleware/functional/collector.py index 62d183e6d8..d2fb4483b9 100644 --- a/ding/framework/middleware/functional/collector.py +++ b/ding/framework/middleware/functional/collector.py @@ -1,6 +1,9 @@ from typing import TYPE_CHECKING, Callable, List, Tuple, Any from functools import reduce import treetensor.torch as ttorch +import numpy as np +from ditk import logging +from ding.utils import EasyTimer from ding.envs import BaseEnvManager from ding.policy import Policy from ding.torch_utils import to_ndarray, get_shape0 @@ -83,7 +86,12 @@ def _inference(ctx: "OnlineRLContext"): return _inference -def rolloutor(policy: Policy, env: BaseEnvManager, transitions: TransitionList) -> Callable: +def rolloutor( + policy: Policy, + env: BaseEnvManager, + transitions: TransitionList, + collect_print_freq=100, +) -> Callable: """ Overview: The middleware that executes the transition process in the env. @@ -98,6 +106,13 @@ def rolloutor(policy: Policy, env: BaseEnvManager, transitions: TransitionList) env_episode_id = [_ for _ in range(env.env_num)] current_id = env.env_num + timer = EasyTimer() + last_train_iter = 0 + total_envstep_count = 0 + total_episode_count = 0 + total_train_sample_count = 0 + env_info = {env_id: {'time': 0., 'step': 0, 'train_sample': 0} for env_id in range(env.env_num)} + episode_info = [] def _rollout(ctx: "OnlineRLContext"): """ @@ -113,22 +128,86 @@ def _rollout(ctx: "OnlineRLContext"): trajectory stops. """ - nonlocal current_id + nonlocal current_id, env_info, episode_info, timer, \ + total_episode_count, total_envstep_count, total_train_sample_count, last_train_iter timesteps = env.step(ctx.action) ctx.env_step += len(timesteps) timesteps = [t.tensor() for t in timesteps] - # TODO abnormal env step + + collected_sample = 0 + collected_step = 0 + collected_episode = 0 + interaction_duration = timer.value / len(timesteps) for i, timestep in enumerate(timesteps): - transition = policy.process_transition(ctx.obs[i], ctx.inference_output[i], timestep) - transition = ttorch.as_tensor(transition) # TBD - transition.collect_train_iter = ttorch.as_tensor([ctx.train_iter]) - transition.env_data_id = ttorch.as_tensor([env_episode_id[timestep.env_id]]) - transitions.append(timestep.env_id, transition) + with timer: + transition = policy.process_transition(ctx.obs[i], ctx.inference_output[i], timestep) + transition = ttorch.as_tensor(transition) + transition.collect_train_iter = ttorch.as_tensor([ctx.train_iter]) + transition.env_data_id = ttorch.as_tensor([env_episode_id[timestep.env_id]]) + transitions.append(timestep.env_id, transition) + + collected_step += 1 + collected_sample += len(transition.obs) + env_info[timestep.env_id.item()]['step'] += 1 + env_info[timestep.env_id.item()]['train_sample'] += len(transition.obs) + + env_info[timestep.env_id.item()]['time'] += timer.value + interaction_duration if timestep.done: - policy.reset([timestep.env_id]) - env_episode_id[timestep.env_id] = current_id + info = { + 'reward': timestep.info['eval_episode_return'], + 'time': env_info[timestep.env_id.item()]['time'], + 'step': env_info[timestep.env_id.item()]['step'], + 'train_sample': env_info[timestep.env_id.item()]['train_sample'], + } + + episode_info.append(info) + policy.reset([timestep.env_id.item()]) + env_episode_id[timestep.env_id.item()] = current_id + collected_episode += 1 current_id += 1 ctx.env_episode += 1 - # TODO log + + total_envstep_count += collected_step + total_episode_count += collected_episode + total_train_sample_count += collected_sample + + if (ctx.train_iter - last_train_iter) >= collect_print_freq and len(episode_info) > 0: + output_log(episode_info, total_episode_count, total_envstep_count, total_train_sample_count) + last_train_iter = ctx.train_iter return _rollout + + +def output_log(episode_info, total_episode_count, total_envstep_count, total_train_sample_count) -> None: + """ + Overview: + Print the output log information. You can refer to the docs of `Best Practice` to understand \ + the training generated logs and tensorboards. + Arguments: + - train_iter (:obj:`int`): the number of training iteration. + """ + episode_count = len(episode_info) + envstep_count = sum([d['step'] for d in episode_info]) + train_sample_count = sum([d['train_sample'] for d in episode_info]) + duration = sum([d['time'] for d in episode_info]) + episode_return = [d['reward'].item() for d in episode_info] + info = { + 'episode_count': episode_count, + 'envstep_count': envstep_count, + 'train_sample_count': train_sample_count, + 'avg_envstep_per_episode': envstep_count / episode_count, + 'avg_sample_per_episode': train_sample_count / episode_count, + 'avg_envstep_per_sec': envstep_count / duration, + 'avg_train_sample_per_sec': train_sample_count / duration, + 'avg_episode_per_sec': episode_count / duration, + 'reward_mean': np.mean(episode_return), + 'reward_std': np.std(episode_return), + 'reward_max': np.max(episode_return), + 'reward_min': np.min(episode_return), + 'total_envstep_count': total_envstep_count, + 'total_train_sample_count': total_train_sample_count, + 'total_episode_count': total_episode_count, + # 'each_reward': episode_return, + } + episode_info.clear() + logging.info("collect end:\n{}".format('\n'.join(['{}: {}'.format(k, v) for k, v in info.items()])))