Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add nsenter support (WIP) #117

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions podman_hpc/nsenter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
from subprocess import Popen, PIPE
import json
import os
import time


"""
This provides a method to use nsenter to spawn the
mpi tasks.
"""


ns2flag = {
'cgroup': '-C',
'ipc': '-i',
'mnt': '-m',
'net': '-n',
'pid': '-p',
'time': '-T',
'user': '-U',
'uts': '-u',
}


def get_env(pid, conf):
"""
Construct the environment for the exec command
"""
# Gather the environment from the run command
with open(f"/proc/{pid}/environ") as f:
data = f.read().split('\x00')[0:-1]
new_env = dict()
for e in data:
k, v = e.split("=", maxsplit=1)
new_env[k] = v
next = False

# Find any environments that should be
# passed
for arg in conf.shared_run_exec_args:
# Find environment flags
if arg == "-e":
next = True
continue
if not next:
continue
next = False
if arg.endswith('*'):
patt = arg[0:-1]
for env in os.environ:
if env.startswith(patt):
new_env[env] = os.environ[env]
elif '=' in arg:
k, v = arg.split("=", maxsplit=1)
new_env[k] = v
else:
new_env[arg] = os.environ[k]
return new_env


def nsenter(conf, timer, args):
"""
Run a command and ignore the output.
Returns the exit code
"""

cmd = ["lsns", "-J"]
shared_run_command = " ".join(conf.shared_run_command)
pid = None

while not pid:
timer.check()
proc = Popen(cmd, stdout=PIPE, stderr=PIPE)
out, err = proc.communicate()
data = None
try:
data = json.loads(out.decode())
except json.JSONDecodeError:
time.sleep(conf.wait_poll_interval)
continue
for proc in data['namespaces']:
if proc['command'] == shared_run_command:
pid = proc['pid']
if not pid:
time.sleep(conf.wait_poll_interval)
continue
cmd = ["/usr/bin/nsenter",
'-t', str(pid), '-U',
"--preserve-credentials"
]
for ns in data['namespaces']:
if ns['pid'] == pid:
cmd.append(ns2flag[ns['type']])

cmd.extend(args)
new_env = get_env(pid, conf)
proc = Popen(cmd, env=new_env)
proc.communicate()
return proc.returncode
139 changes: 81 additions & 58 deletions podman_hpc/podman_hpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .siteconfig import SiteConfig
from multiprocessing import Process
from subprocess import Popen, PIPE
from .nsenter import nsenter


def podman_devnull(cmd, conf):
Expand Down Expand Up @@ -47,6 +48,50 @@ def pmi_fd():
return ["--preserve-fds", "1"]


class Timer():
def __init__(self, conf, run_thread):
start_time = time.time()
self.end_time = start_time + conf.wait_timeout
self.run_thread = run_thread

def check(self):
if time.time() > self.end_time:
msg = "Timeout waiting for shared-run start"
raise OSError(msg)
elif self.run_thread and self.run_thread.exitcode:
raise OSError("Failed to start container")


def _podman_exec(conf, timer, cli_opts,
options, container_cmd):
# wait for container to exist
comm = ["container", "exists", conf.container_name]
time.sleep(conf.wait_poll_interval)
while podman_devnull(comm, conf) != 0:
time.sleep(conf.wait_poll_interval)
timer.check()
comm = ["wait", "--condition", "running", conf.container_name]
podman_devnull(comm, conf)
exec_cmd = [
conf.podman_bin,
"exec",
]
exec_cmd.extend(cli_opts)
exec_cmd.extend(pmi_fd())
exec_cmd.extend(conf.shared_run_exec_args)
exec_cmd.extend(
cpt.filterValidOptions(options, [conf.podman_bin, "exec", "--help"])
)
exec_cmd.extend([conf.container_name] + list(container_cmd))
fds = [0, 1, 2]
if 'PMI_FD' in os.environ:
fds.append(int(os.environ['PMI_FD']))
conf.env["PMI_FD"] = os.environ["PMI_FD"]
proc = Popen(exec_cmd, env=conf.env, pass_fds=fds)
proc.communicate()
return proc.returncode


# function to specify help message formatting to mimic the podman help page.
# follows the style of click.Command.format_help()
# this will be inherited by subcommands created with @podhpc.command()
Expand Down Expand Up @@ -90,6 +135,9 @@ def podman_format(self, ctx, formatter):
invoke_without_command=True,
)
@click.pass_context
@click.option(
"--nsenter", is_flag=True, help="Use nsenter for shared run", default=False
)
@click.option(
"--additional-stores", type=str, help="Specify other storage locations"
)
Expand All @@ -98,8 +146,8 @@ def podman_format(self, ctx, formatter):
type=str,
help="Specify alternate squash directory location",
)
@click.option("--log-level", type=str, default="fatal", hidden=True)
def podhpc(ctx, additional_stores, squash_dir, log_level):
@click.option("--log-level", type=str, default="error", hidden=True)
def podhpc(ctx, nsenter, additional_stores, squash_dir, log_level):
"""Manage pods, containers and images ... on HPC!

The podman-hpc utility is a wrapper script around the podman
Expand All @@ -114,7 +162,9 @@ def podhpc(ctx, additional_stores, squash_dir, log_level):

# set up site configuration object
try:
conf = SiteConfig(squash_dir=squash_dir, log_level=log_level)
conf = SiteConfig(squash_dir=squash_dir,
log_level=log_level,
nsenter=nsenter)
except Exception as ex:
sys.stderr.write(f"Error: {ex}... Exiting\n")
sys.exit(1)
Expand Down Expand Up @@ -171,6 +221,7 @@ def rmsqi(siteconf, image):
mu = MigrateUtils(conf=siteconf)
mu.remove_image(image)


# podman-hpc images subcommand #############################################
@pass_siteconf
@click.pass_context
Expand All @@ -181,6 +232,7 @@ def images(ctx, siteconf, image, podman_args, **site_opts):
cmd.extend(podman_args)
cmd.extend(siteconf.get_cmd_extensions("images", site_opts))


# podman-hpc pull subcommand (modified) ####################################
@podhpc.command(
context_settings=dict(
Expand Down Expand Up @@ -208,6 +260,7 @@ def pull(ctx, siteconf, image, podman_args, **site_opts):
sys.stderr.write("Pull failed.\n")
sys.exit(proc.returncode)


# podman-hpc shared-run subcommand #########################################
@podhpc.command(
context_settings=dict(
Expand Down Expand Up @@ -242,18 +295,15 @@ def shared_run(conf, run_args, **site_opts):
# click.echo(f"Launching a shared-run with args: {sys.argv}")
_shared_run(conf, run_args, **site_opts)


def _shared_run(conf, run_args, **site_opts):
"""
Internal function for the shared_run. This is so we can
also call it when the user does run but enabled a module
that has shared_run set to True.
that has shared_run set to True.
"""

localid = os.environ.get(conf.localid_var)
ntasks_raw = os.environ.get(conf.tasks_per_node_var, "1")
ntasks = int(re.search(conf.ntasks_pattern, ntasks_raw)[0])
container_name = f"uid-{os.getuid()}-pid-{os.getppid()}"
sock_name = f"/tmp/uid-{os.getuid()}-pid-{os.getppid()}"

# construct run and exec commands from user options
# We need to filter out any run args in the run_args
Expand All @@ -273,59 +323,34 @@ def _shared_run(conf, run_args, **site_opts):
sys.argv.index("shared-run") + 1: sys.argv.index(image)
]

run_cmd = [conf.podman_bin, "run", "--rm", "-d", "--name", container_name]
run_cmd = [conf.podman_bin, "run", "--rm", "-d", "--name",
conf.container_name]
run_cmd.extend(
cpt.filterValidOptions(options, [conf.podman_bin, "run", "--help"])
)
run_cmd.extend(conf.get_cmd_extensions("run", site_opts))
run_cmd.append(image)
run_cmd.extend(conf.shared_run_command)

exec_cmd = [
conf.podman_bin,
"exec",
]
exec_cmd.extend(conf.get_cmd_extensions("exec", site_opts))
exec_cmd.extend(pmi_fd())
exec_cmd.extend(conf.shared_run_exec_args)
exec_cmd.extend(
cpt.filterValidOptions(options, [conf.podman_bin, "exec", "--help"])
)
exec_cmd.extend([container_name] + list(container_cmd))
# click.echo(f"run_cmd is: {run_cmd}")
# click.echo(f"exec_cmd is: {exec_cmd}")

# Start monitor and run threads
monitor_thread = None
run_thread = None
proc = None
if (localid is None or int(localid) == 0):
monitor_thread = Process(target=monitor, args=(sock_name, ntasks,
container_name, conf))
monitor_thread = Process(target=monitor, args=(conf,))
monitor_thread.start()
run_thread = Process(target=shared_run_exec, args=(run_cmd, conf.env))
run_thread.start()

timer = Timer(conf, run_thread)
cli_opts = conf.get_cmd_extensions("exec", site_opts)
try:
# wait for container to exist
comm = ["container", "exists", container_name]
start_time = time.time()
while podman_devnull(comm, conf) != 0:
time.sleep(conf.wait_poll_interval)
if time.time() - start_time > conf.wait_timeout:
msg = "Timeout waiting for shared-run start"
raise OSError(msg)
if run_thread and run_thread.exitcode:
raise OSError("Failed to start container")
comm = ["wait", "--condition", "running", container_name]
podman_devnull(comm, conf)
fds = [0, 1, 2]
if 'PMI_FD' in os.environ:
fds.append(int(os.environ['PMI_FD']))
conf.env["PMI_FD"] = os.environ["PMI_FD"]
proc = Popen(exec_cmd, env=conf.env, pass_fds=fds)
proc.communicate()
send_complete(sock_name, localid)
exit_code = 1
if conf.nsenter:
exit_code = nsenter(conf, timer, container_cmd)
else:
exit_code = _podman_exec(conf, timer, cli_opts,
options, container_cmd)

send_complete(conf.sock_name, localid)
# Close out threads
if monitor_thread:
monitor_thread.join()
Expand All @@ -338,12 +363,9 @@ def _shared_run(conf, run_args, **site_opts):
monitor_thread.kill()
if run_thread:
run_thread.kill()
if os.path.exists(sock_name):
os.remove(sock_name)
if os.path.exists(conf.sock_name):
os.remove(conf.sock_name)
finally:
exit_code = 1
if proc:
exit_code = proc.returncode
sys.exit(exit_code)


Expand Down Expand Up @@ -404,27 +426,28 @@ def shared_run_exec(run_cmd, env):
out, err = proc.communicate()
if proc.returncode != 0:
sys.stderr.write(err.decode())
sys.stderr.write(out.decode())


def monitor(sockfile, ntasks, container_name, conf):
def monitor(conf):
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
os.remove(sockfile)
os.remove(conf.sock_name)
except OSError:
pass
s.bind(sockfile)
s.bind(conf.sock_name)
ct = 0
while True:
s.listen()
conn, addr = s.accept()
ct += 1
if ct == ntasks:
if ct == conf.ntasks:
break
conn.close()
os.remove(sockfile)
os.remove(conf.sock_name)
# cleanup
podman_devnull(["kill", container_name], conf)
podman_devnull(["rm", container_name], conf)
podman_devnull(["kill", conf.container_name], conf)
podman_devnull(["rm", conf.container_name], conf)


def send_complete(sockfile, lid):
Expand Down
Loading