Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): upload and scan the target code base into base url #1

Merged
merged 15 commits into from
Aug 22, 2024
Merged
5 changes: 4 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ on: [ push, pull_request ]
jobs:
test:
runs-on: ubuntu-latest

env:
CODETHREAT_ORG: ${{ secrets.CODETHREAT_ORG }}
CODETHREAT_TOKEN: ${{ secrets.CODETHREAT_TOKEN }}
CODETHREAT_URL: ${{ secrets.CODETHREAT_URL }}
steps:
- name: Checkout code
uses: actions/checkout@v2
Expand Down
72 changes: 68 additions & 4 deletions cli/auth.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,74 @@
import click
import requests
import os
from colorama import Fore, Style, init
from .utils import get_config_value, CONFIG_FILE_PATH

# Initialize colorama
init(autoreset=True)

@click.group()
def auth():
"""Authentication commands for CodeThreat CLI."""
pass

@click.command()
@click.option('--key', prompt=True, hide_input=True, confirmation_prompt=True,
@click.option('--key', prompt=False, hide_input=True, confirmation_prompt=True,
default=lambda: get_config_value("CODETHREAT_SECRET"),
help='Personal Access Token for authentication')
def auth(key):
@click.option('--url', prompt=False,
default=lambda: get_config_value("CODETHREAT_APP_URL"),
help='CodeThreat application URL')
@click.option('--org', prompt=False,
default=lambda: get_config_value("CODETHREAT_ORG"),
help='Organization name')
def login(key, url, org):
"""Authenticate with your Personal Access Token."""
click.echo(f'Authenticated with token: {key}')
# Logic to store the token securely

if not key:
key = click.prompt(f"{Fore.CYAN}[CT*] Personal Access Token{Style.RESET_ALL}", hide_input=True, confirmation_prompt=True)

if not url:
url = click.prompt(f"{Fore.CYAN}[CT*] CodeThreat application URL{Style.RESET_ALL}")

if not org:
org = click.prompt(f"{Fore.CYAN}[CT*] Organization Name{Style.RESET_ALL}")

headers = {
"Authorization": f"Bearer {key}",
"User-Agent": "CodeThreat-CLI",
"x-ct-organization": org
}

click.echo(f"{Fore.CYAN}[CT*] Authenticating...{Style.RESET_ALL}")

# Validate authentication by making a request
validate_url = f"{url}/api/organization?key={org}"
response = requests.get(validate_url, headers=headers)

if response.status_code == 200:
click.echo(f"{Fore.GREEN}[CT*] Authentication successful.{Style.RESET_ALL}")

# Save to config file if not already in env
with open(CONFIG_FILE_PATH, "w") as config_file:
config_file.write(f"CODETHREAT_SECRET={key}\n")
config_file.write(f"CODETHREAT_APP_URL={url}\n")
config_file.write(f"CODETHREAT_ORG={org}\n")
else:
click.echo(f"{Fore.RED}[CT*] Authentication failed: {response.status_code} - {response.text}{Style.RESET_ALL}")

@click.command()
def remove():
"""Remove the existing authentication configuration."""
if os.path.exists(CONFIG_FILE_PATH):
os.remove(CONFIG_FILE_PATH)
click.echo(f"{Fore.YELLOW}[CT*] Authentication configuration removed successfully.{Style.RESET_ALL}")
else:
click.echo(f"{Fore.RED}[CT*] No authentication configuration found.{Style.RESET_ALL}")

# Add commands to the auth group
auth.add_command(login)
auth.add_command(remove)

if __name__ == "__main__":
auth()
22 changes: 18 additions & 4 deletions cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,27 @@
from cli.auth import auth
from cli.scan import scan

# Define version information here
VERSION = "0.1.0"

@click.group()
def main():
"""CodeThreat CLI - A toolset for SAST."""
pass

# Custom help display with an enhanced ASCII art banner
@click.group(
invoke_without_command=True,
help="""
CodeThreat CLI - A Comprehensive Scanner Integration in your local environment!
""",
context_settings=dict(help_option_names=['-h', '--help']),
)
@click.version_option(VERSION, '-v', '--version', message="CodeThreat CLI Version: %(version)s")
@click.pass_context
def main(ctx):
"""CodeThreat CLI - A Toolset for SAST (Static Application Security Testing)."""
if ctx.invoked_subcommand is None:
click.echo(ctx.get_help()) # Show help if no command is invoked


# Registering commands
main.add_command(auth)
main.add_command(scan)

Expand Down
179 changes: 176 additions & 3 deletions cli/scan.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,183 @@
import os
import requests
import click
import shutil
import tempfile
import sys
import time
from datetime import datetime
from colorama import Fore, Style, init
from .utils import get_config_value

# Initialize colorama
init(autoreset=True)

SUCCESS_EXIT_CODE = 0
ERROR_EXIT_CODE = 1
FAILURE_EXIT_CODE = 2

@click.command()
@click.option('--target', required=True, help='Path to the target codebase')
@click.option('--project', required=True, help='Project name in CodeThreat')
def scan(target, project):
@click.option('--url', default=lambda: get_config_value("CODETHREAT_APP_URL"), help='CodeThreat application URL')
@click.option('--token', default=lambda: get_config_value("CODETHREAT_SECRET"), help='Personal Access Token')
@click.option('--org', default=lambda: get_config_value("CODETHREAT_ORG"), help='Organization name')
@click.option('--branch', default=None, help='Branch name for the scan (optional)')
@click.option('--policy_id', default=None, help='Policy ID under which the analysis should be processed (optional)')
def scan(target, project, url, token, org, branch, policy_id):
"""Run a SAST scan on the specified target."""
click.echo(f'Scanning target: {target} in project: {project}')
# Logic to connect to the CodeThreat API and trigger the scan

# Ensure the base URL doesn't have a trailing slash
base_url = url.rstrip('/')

headers = {
"Authorization": f"Bearer {token}",
"User-Agent": "CodeThreat-CLI",
"x-ct-organization": org,
}

def get_timestamp():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

def print_message(message, color=Fore.WHITE, newline=True):
if newline:
print(f"{color}[{get_timestamp()}] {message}{Style.RESET_ALL}")
else:
print(f"{color}[{get_timestamp()}] {message}{Style.RESET_ALL}", end="\r")

def poll_scan_status(scan_id):
status_url = f"{base_url}/api/scan/status/{scan_id}"
seen_logs = set()
scan_started = False

# Track the previous values of SAST and SCA severities
previous_sast = {"critical": 0, "high": 0, "medium": 0, "low": 0}
previous_sca = {"critical": 0, "high": 0, "medium": 0, "low": 0}

while True:
status_response = requests.get(status_url, headers=headers)
if status_response.status_code == 200:
status_data = status_response.json()
state = status_data.get('state', 'unknown')

# Print new logs
logs = status_data.get('logs', [])
for log in logs:
log_id = (log['logType'], log['message'], log['create_date'])
if log_id not in seen_logs:
print_message(f"[CT*] {log['logType']}: {log['message']}", Fore.YELLOW)
seen_logs.add(log_id)

# Notify the user when the scan officially starts
if log['logType'] == "information" and log['message'] == "INFO110: Scan is starting...":
if not scan_started:
scan_started = True
print_message(f"[CT*] The scan has officially started. The CLI will notify you when the scan is complete.", Fore.CYAN)

# Display scan metrics if they have changed
sast_severities = status_data.get('sast_severities', {})
sca_severities = status_data.get('sca_severities', {})

sast_changes = {
severity: sast_severities.get(severity, 0) - previous_sast[severity]
for severity in previous_sast
if sast_severities.get(severity, 0) > previous_sast[severity]
}

sca_changes = {
severity: sca_severities.get(severity, 0) - previous_sca[severity]
for severity in previous_sca
if sca_severities.get(severity, 0) > previous_sca[severity]
}

if sast_changes or sca_changes:
# Beautify the findings output
if sca_changes:
print_message("[CT*] New Dependency Issues Found:", Fore.RED)
for severity, count in sca_changes.items():
print_message(f" {Fore.RED if severity == 'critical' else Fore.YELLOW if severity == 'high' else Fore.MAGENTA}• {count} {severity.capitalize()} Issue{'s' if count > 1 else ''}{Style.RESET_ALL}")

if sast_changes:
print_message("[CT*] New Code Issues Found:", Fore.RED)
for severity, count in sast_changes.items():
print_message(f" {Fore.RED if severity == 'critical' else Fore.YELLOW if severity == 'high' else Fore.MAGENTA}• {count} {severity.capitalize()} Issue{'s' if count > 1 else ''}{Style.RESET_ALL}")

# Update previous values
previous_sast.update(sast_severities)
previous_sca.update(sca_severities)

# Check if the scan is complete
if state.lower() == 'end':
print_message(f"[CT*] Scan completed successfully.", Fore.GREEN)
report_url = f"{base_url}/projects/project-details/{project}?tenant={org}"
print_message(f"Check the report here: {report_url}", Fore.BLUE)
sys.exit(SUCCESS_EXIT_CODE)
elif state.lower() in ['failed', 'error']:
print_message(f"[CT*] Scan failed or encountered an error.", Fore.RED)
sys.exit(FAILURE_EXIT_CODE)
else:
print_message(f"[CT*] Failed to retrieve scan status: {status_response.status_code} - {status_response.text}", Fore.RED)
sys.exit(ERROR_EXIT_CODE)

time.sleep(10)

# Check if the project exists
project_check_url = f"{base_url}/api/project?key={project}"
response = requests.get(project_check_url, headers=headers)

if response.status_code == 200:
print_message(f"[CT*] Project '{project}' exists. Proceeding with scan.", Fore.CYAN)
else:
# Create the project if it doesn't exist
create_project_url = f"{base_url}/api/project/add"
project_data = {
"project_name": project,
"description": f"Project {project} created by CodeThreat CLI",
"tags": ["code-threat", "cli"]
}
response = requests.post(create_project_url, headers=headers, json=project_data)

# Correctly handle the response based on the `error` field and `message`
if response.status_code == 200:
response_data = response.json()
if not response_data.get('error') and response_data.get('result', {}).get('message') == "successfull":
print_message(f"[CT*] Project '{project}' created successfully.", Fore.GREEN)
else:
print_message(f"[CT*] Failed to create project '{project}': {response.text}", Fore.RED)
sys.exit(ERROR_EXIT_CODE)
else:
print_message(f"[CT*] Failed to create project '{project}': {response.text}", Fore.RED)
sys.exit(ERROR_EXIT_CODE)

# Zip the target directory in a cross-platform manner
print_message(f"[CT*] Zipping the target directory: {target}", Fore.CYAN)
with tempfile.TemporaryDirectory() as temp_dir:
zip_path = os.path.join(temp_dir, f"{project}.zip")
shutil.make_archive(zip_path.replace(".zip", ""), 'zip', target)

# Upload the zipped directory and start the scan
upload_url = f"{base_url}/api/scan/start"
with open(zip_path, 'rb') as f:
files = {'upfile': (f"{project}.zip", f, 'application/zip')}
data = {
'project': project,
}
if branch:
data['branch'] = branch
if policy_id:
data['policy_id'] = policy_id

response = requests.post(upload_url, headers=headers, files=files, data=data)

if response.status_code == 200 and not response.json().get("error", True):
scan_id = response.json().get('scan_id')
print_message(f"[CT*] Scan started successfully for project '{project}'. Scan ID: {scan_id}", Fore.GREEN)
else:
print_message(f"[CT*] Scan initiation failed: {response.status_code} - {response.text}", Fore.RED)
sys.exit(FAILURE_EXIT_CODE)

# Start the scan status polling in the main thread
poll_scan_status(scan_id)

if __name__ == "__main__":
scan()
21 changes: 21 additions & 0 deletions cli/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import os

CONFIG_FILE_PATH = os.path.expanduser("~/.codethreat_cli_config")


def get_config_value(key):
"""Fetch configuration value from environment variable or config file."""
# First, try to get the value from environment variables
value = os.getenv(key)
if value:
return value

# If not in environment variables, try to get it from the config file
if os.path.exists(CONFIG_FILE_PATH):
with open(CONFIG_FILE_PATH, "r") as config_file:
for line in config_file:
if line.startswith(key):
return line.split("=", 1)[1].strip()

# If the key is not found in either, return None
return None
16 changes: 12 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,24 +1,32 @@
[tool.commitizen]
name = "cz_conventional_commits"
tag_format = "$version"
tag_format = "$major.$minor.$patch$prerelease"
version_scheme = "semver2"
version_provider = "pep621"
update_changelog_on_bump = true

[project]
name = "codethreat-cli"
version = "0.1.0"
requires-python = ">=3.12"
description = "CLI toolset for CodeThreat SAST"
authors = ["Your Name <[email protected]>"]
authors = [
{ name = "CodeThreat", email = "[email protected]" }
]
dependencies = [
# Your project dependencies

]

[project.urls]
homepage = "https://cloud.codethreat.com"
repository = "https://github.com/codethreat/codethreat-cli"

# Specify the entry points (scripts)
[project.scripts]
codethreat-cli = "cli.main:main"

# Optional: For more modern Python packaging configuration
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"
build-backend = "setuptools.build_meta"

4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
setuptools~=73.0.1
click~=8.1.7
click~=8.1.7
requests~=2.32.3
colorama~=0.4.6
Loading
Loading