Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vulnerabilities Corrected: Tar Slip and Path Traversal #90

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 47 additions & 8 deletions moshi/moshi/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import sphn
import torch


from .client_utils import make_log
from .models import loaders, MimiModel, LMModel, LMGen

Expand Down Expand Up @@ -166,11 +165,25 @@ async def send_loop():
return ws


def safe_extract(tar: tarfile.TarFile, path: Path, members=None, *, numeric_owner=False):
"""
Safely extract tar files to prevent Path Traversal vulnerabilities.
"""
for member in tar.getmembers():
member_path = Path(path) / member.name
try:
# Resolve the absolute path and ensure it is within the target directory
member_path.resolve().relative_to(path.resolve())
except ValueError:
raise Exception(f"Malicious file detected: {member.name}")
tar.extractall(path=path, members=members, numeric_owner=numeric_owner)


def main():
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="localhost", type=str)
parser.add_argument("--port", default=8998, type=int)
parser.add_argument("--static", type=str)
parser.add_argument("--static", type=str, help="Directory for static content")
parser.add_argument("--gradio-tunnel", action='store_true', help='Activate a gradio tunnel.')
parser.add_argument("--gradio-tunnel-token",
help='Provide a custom (secret) token here to keep getting the same URL.')
Expand Down Expand Up @@ -223,27 +236,53 @@ def main():
app = web.Application()
app.router.add_get("/api/chat", state.handle_chat)
static_path: None | str = None

if args.static is None:
log("info", "retrieving the static content")
dist_tgz = hf_hub_download("kyutai/moshi-artifacts", "dist.tgz")
dist_tgz = Path(dist_tgz)
dist = dist_tgz.parent / "dist"
if not dist.exists():
with tarfile.open(dist_tgz, "r:gz") as tar:
tar.extractall(path=dist_tgz.parent)
safe_extract(tar, path=dist_tgz.parent)
static_path = str(dist)
elif args.static != "none":
elif args.static.lower() != "none":
# When set to the "none" string, we don't serve any static content.
static_path = args.static

if static_path is not None:
async def handle_root(_):
return web.FileResponse(os.path.join(static_path, "index.html"))
# Resolve the static path to an absolute path
static_path = Path(static_path).resolve()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the typer will complain with this. I see you didn't run pre-commit hooks

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

precommit

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

precommit


log("info", f"serving static content from {static_path}")
# Ensure that the static path exists and is a directory
if not static_path.is_dir():
log("error", f"The provided static path is not a valid directory: {static_path}")
sys.exit(1)

# Optional: Define a base directory to restrict static content
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not want to have features that are based on uncommented lines of code. Could you remove this block?

# Uncomment and set to your base directory if needed
# base_static_dir = Path("/var/www/static").resolve()
# if not static_path.is_relative_to(base_static_dir):
# log("error", "Attempt to access a directory outside the allowed base directory.")
# sys.exit(1)

async def handle_root(_):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we do not pretend to be resilient to a wrongly formed static, I think I would rather have a mucher simpler implementation, closer to the original, simply adding before the serving:

assert (Path(static_path) / 'index.html').is_file(), "index.html missing in static folder."

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

precommit

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

precommit

try:
index_file = static_path / "index.html"
if not index_file.is_file():
return web.Response(status=404, text="Index file not found.")
return web.FileResponse(index_file)
except Exception as e:
log("error", f"Error serving index.html: {e}")
return web.Response(status=500, text="Internal Server Error.")

log("info", f"Serving static content from {static_path}")
app.router.add_get("/", handle_root)
app.router.add_static(
"/", path=static_path, follow_symlinks=True, name="static"
"/", path=str(static_path), follow_symlinks=False, name="static",
# Add more restrictions if necessary, such as allowing only certain methods
)

log("info", f"Access the Web UI directly at http://{args.host}:{args.port}")
if setup_tunnel is not None:
tunnel = setup_tunnel('localhost', args.port, tunnel_token, None)
Expand Down
Loading