diff --git a/coreos-ostree-importer/Dockerfile b/coreos-ostree-importer/Dockerfile new file mode 100644 index 0000000..cd020d8 --- /dev/null +++ b/coreos-ostree-importer/Dockerfile @@ -0,0 +1,28 @@ +FROM registry.fedoraproject.org/fedora:30 + +# set PYTHONUNBUFFERED env var to non-empty string so that our +# periods with no newline get printed immediately to the screen +ENV PYTHONUNBUFFERED=true + +# Get any latest updates since last container spin +RUN dnf update -y && dnf clean all + +# Install boto/fedmsg/ostree libraries +RUN dnf -y install python3-boto3 fedora-messaging ostree && dnf clean all + +# Put the file into a location that can be imported +ADD coreos_ostree_importer.py /usr/lib/python3.7/site-packages/ + +# Copy in the fedora messaging config into the +# default location +ADD fedora-messaging-config.toml /etc/fedora-messaging/config.toml + +# Environment variable to be defined by the user that defines the +# location of the AWS credentials file and also the path to the +# filesystem path to the keytab file. If blank it will be ignored +# and privileged (write) operations won't be attempted +ENV AWS_CONFIG_FILE '' + +# Call fedora-messaging CLI and tell it to use the Consumer +# class from the included module. +CMD fedora-messaging consume --callback=coreos_ostree_importer:Consumer diff --git a/coreos-ostree-importer/README.md b/coreos-ostree-importer/README.md new file mode 100644 index 0000000..c318aaf --- /dev/null +++ b/coreos-ostree-importer/README.md @@ -0,0 +1,115 @@ +# coreos-ostree-importer + +Source code that watches for ostree-import requests on the fedora +messaging bus and imports those commit objects into the ostree +repositories managed by Fedora infra/releng teams. + +# Rough notes for running locally: + +If you'd like to use a local rabbitmq server setup you'll need to modify the +`amqp_url` at the top of the `fedora-messaging-config.toml` file +to point to your local server. For example: `amqp_url = "amqp://192.168.121.2"` +See the [later section](#running-rabbitmq-server-locally) +on that topic and update your fedora messaging config accordingly. + +From your local git directory: + +``` +podman build -t coreos-ostree-importer . +``` + +Create a file with aws credentials somewhere: + +``` +cat <<'EOF' > /dev/shm/secret +[default] +aws_access_key_id=keyid +aws_secret_access_key=key +EOF +``` + +Create some empty OSTree repos: + +``` +mkdir /srv/prodrepo +mkdir /srv/composerepo + +ostree --repo=/srv/prodrepo init --mode=archive +ostree --repo=/srv/composerepo init --mode=archive +``` + +Run the importer: + +``` +podman run -it --rm \ + -v $PWD/:/pwd/ \ + -v /dev/shm/secret:/.aws/config \ + -e AWS_CONFIG_FILE=/.aws/config \ + -v /srv/composerepo/:/mnt/koji/compose/ostree/repo/:z \ + -v /srv/prodrepo/:/mnt/koji/ostree/repo/:z \ + coreos-ostree-importer +``` + + +If you'd like you can add `--entrypoint=/bin/bash` and run +`/pwd/coreos_koji_tagger.py` directly. If you modify the json at the top +of the file you can test out the import locally. + + +# Running rabbitmq server locally: + + +## Server + +The rough steps for setting up a server are: + +- `sudo dnf install -y fedora-messaging rabbitmq-server` +- `sudo systemctl start rabbitmq-server` + +Optional - to see a web browser view: + +- `sudo sed -i -e 's|@RABBITMQ_USER@|rabbitmq|' -e 's|@RABBITMQ_GROUP@|rabbitmq|' /usr/sbin/rabbitmq-plugins` + - https://bugzilla.redhat.com/show_bug.cgi?id=1755152 +- `sudo rabbitmq-plugins enable rabbitmq_management` +- Navigate to `:15672` in a web browser and log in with `guest`/`guest`. +- Navigate to `Queues` tab to view existing queues/messages. + +## Fedora Messaging consumer + +If you want to see the `request.ostree-import.finished` messages sent by the ostree-importer +you can run the following command on the on the same system that is running the rabbitmq server. + +``` +fedora-messaging consume --callback=fedora_messaging.example:printer --routing-key org.fedoraproject.prod.coreos.build.request.ostree-import.finished +``` + +## Fedora Messaging sender + +If you'd like to send a `request.ostree-import` message to rabbitmq (i.e. letting the +ostree-importer listen and react to the message) you can do something like this python file +on the rabbitmq server: + +``` +cat <<'EOF' > publisher.py +#!/usr/bin/python3 +from fedora_messaging import api, message +topic = 'org.fedoraproject.prod.coreos.build.request.ostree-import' +body = { + "build_id": "30.20190905.0", + "stream": "testing", + "basearch": "x86_64", + "commit": "s3://fcos-builds/prod/streams/testing/builds/30.20190905.0/x86_64/ostree-commit.tar", + "checksum": "sha256:d01db6939e7387afa2492ac8e2591c53697fc21cf16785585f7f1ac0de692863", + "ostree_ref": "fedora/x86_64/coreos/testing", + "ostree_checksum": "b4beca154dab3696fd04f32ddab818102caa9247ec3192403adb9aaecc991bd9", + "target_repo": "prod" +} +api.publish(message.Message(topic=topic, body=body)) +EOF +``` + +You'll have to update the body with new information you'd like to use. Then run: + +``` +./publisher.py +``` diff --git a/coreos-ostree-importer/coreos_ostree_importer.py b/coreos-ostree-importer/coreos_ostree_importer.py new file mode 100755 index 0000000..82602d4 --- /dev/null +++ b/coreos-ostree-importer/coreos_ostree_importer.py @@ -0,0 +1,300 @@ +#!/usr/bin/python3 + +import boto3 +import botocore +import fedora_messaging +import fedora_messaging.api +import hashlib +import json +import logging +import os +import subprocess +import sys +import tarfile +import tempfile +import traceback + +# Set local logging +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + +FEDORA_MESSAGING_TOPIC_LISTEN = ( + "org.fedoraproject.prod.coreos.build.request.ostree-import" +) +FEDORA_MESSAGING_TOPIC_RESPOND = FEDORA_MESSAGING_TOPIC_LISTEN + ".finished" + + +# We are processing the org.fedoraproject.prod.coreos.build.request.ostree-import topic +# https://apps.fedoraproject.org/datagrepper/raw?topic=org.fedoraproject.prod.coreos.build.request.ostree-import&delta=100000 +# The schema was originally designed in: +# https://github.com/coreos/fedora-coreos-tracker/issues/198#issuecomment-513944390 +EXAMPLE_MESSAGE_BODY = json.loads(""" +{ + "build_id": "30.20190905.0", + "stream": "testing", + "basearch": "x86_64", + "commit": "s3://fcos-builds/prod/streams/testing/builds/30.20190905.0/x86_64/ostree-commit.tar", + "checksum": "sha256:d01db6939e7387afa2492ac8e2591c53697fc21cf16785585f7f1ac0de692863", + "ostree_ref": "fedora/x86_64/coreos/testing", + "ostree_checksum": "b4beca154dab3696fd04f32ddab818102caa9247ec3192403adb9aaecc991bd9", + "target_repo": "prod" +} +""" +) + +KNOWN_OSTREE_REPOS = { + "prod": "/mnt/koji/ostree/repo", + "compose": "/mnt/koji/compose/ostree/repo", +} + +# Given a repo (and thus an input JSON) analyze existing koji tag set +# and tag in any missing packages +class Consumer(object): + def __init__(self): + # Check the possible repos to make sure they exist + for path in KNOWN_OSTREE_REPOS.values(): + if not ostree_repo_exists(path): + raise Exception(f"OSTree repo does not exist at {path}") + + logger.info( + "Processing messages with topic: %s" % FEDORA_MESSAGING_TOPIC_LISTEN + ) + + def __call__(self, message: fedora_messaging.api.Message): + # Catch any exceptions and don't raise them further because + # it will cause /usr/bin/fedora-messaging to crash and we'll + # lose the traceback logs from the container + try: + self.process(message) + logger.info("Sending SUCCESS message") + send_message(msg=message.body, status="SUCCESS") + except Exception as e: + logger.error("Caught Exception!") + logger.error("###################################") + traceback.print_exc() + logger.error("###################################") + logger.error("Replying with a FAILURE message...") + send_message(msg=message.body, status="FAILURE") + logger.error("\t continuing...") + pass + + def process(self, message: fedora_messaging.api.Message): + logger.debug(message.topic) + logger.debug(message.body) + + # Grab the raw message body and parse out pieces + msg = message.body + basearch = msg["basearch"] + build_id = msg["build_id"] + checksum = msg["checksum"] + commit_url = msg["commit"] + ostree_checksum = msg["ostree_checksum"] + ostree_ref = msg["ostree_ref"] + stream = msg["stream"] + target_repo = msg["target_repo"] + + # Qualify arguments + if not checksum.startswith("sha256:"): + raise Exception("checksum value must start with sha256:") + if target_repo not in KNOWN_OSTREE_REPOS.keys(): + raise Exception(f"Provided target repo is unknown: {target_repo}") + + sha256sum = checksum[7:] + target_repo_path = KNOWN_OSTREE_REPOS[target_repo] + source_repo_path = None + + # Detect if the commit already exists in the target repo + # NOTE: We assume here that an import won't be requested twice for + # the same commit (i.e. someone adds detached metadata and + # then does a second import request). + if ostree_commit_exists(target_repo_path, ostree_checksum): + logger.info( + f"Commit {ostree_checksum} already exists in the target repo. " + "Skipping import" + ) + return + + # Import the OSTree commit to the specified repo. We'll use + # a temporary directory to untar the repo into. + with tempfile.TemporaryDirectory() as tmpdir: + # If the target repo is the prod repo the commit could + # already have been imported into the compose repo. If it + # is already in the compose repo then let's just pull-local + # from there to save downloading all from the net again. + if target_repo == "prod" and ostree_commit_exists( + repo=KNOWN_OSTREE_REPOS["compose"], commit=ostree_checksum + ): + logger.info("Commit exists in compose repo. Importing from there") + source_repo_path = KNOWN_OSTREE_REPOS["compose"] + else: + # Grab the file from s3 and then pull local + untar_file_from_s3(url=commit_url, tmpdir=tmpdir, sha256sum=sha256sum) + source_repo_path = tmpdir + + # one more sanity check: make sure buildid == version + assert_commit_has_version( + repo=source_repo_path, commit=ostree_checksum, version=build_id + ) + # Import the commit into the target repo + ostree_pull_local( + commit=ostree_checksum, + dstrepo=target_repo_path, + srcrepo=source_repo_path, + branch=ostree_ref, + ) + + +def runcmd(cmd: list, **kwargs: int) -> subprocess.CompletedProcess: + try: + # default args to pass to subprocess.run + pargs = {"check": True, "capture_output": True} + logger.debug(f"Running command: {cmd}") + pargs.update(kwargs) + cp = subprocess.run(cmd, **pargs) + except subprocess.CalledProcessError as e: + logger.error("Command returned bad exitcode") + logger.error(f"COMMAND: {cmd}") + logger.error(f" STDOUT: {e.stdout.decode()}") + logger.error(f" STDERR: {e.stderr.decode()}") + raise e + return cp # subprocess.CompletedProcess + + +def send_message(msg: dict, status: str): + # Send back a message with all the original message body + # along with an additional `status:` header with either + # `SUCCESS` or `FAILURE`. + fedora_messaging.api.publish( + fedora_messaging.message.Message( + topic=FEDORA_MESSAGING_TOPIC_RESPOND, body={"status": status, **msg} + ) + ) + + +# https://stackoverflow.com/a/55542529 +def get_sha256sum(filepath: str) -> str: + h = hashlib.sha256() + with open(filepath, "rb") as file: + while True: + # Reading is buffered, so we can read smaller chunks. + chunk = file.read(h.block_size) + if not chunk: + break + h.update(chunk) + return h.hexdigest() + + +def parse_s3_url(url: str) -> tuple: + if not url.startswith("s3://"): + raise Exception(f"Unable to parse the s3 url: {url}") + # Chop off s3:// and break into bucket / key + bucket, key = url[5:].split("/", 1) + return (bucket, key) + + +def untar_file_from_s3(url: str, tmpdir: str, sha256sum: str): + filename = "ostree.tar" + filepath = os.path.join(tmpdir, filename) + + # Grab file from s3 + logger.info(f"Downloading object from s3: {url}") + s3 = boto3.client("s3") + bucket, key = parse_s3_url(url) + s3.download_file(bucket, key, filepath) + + # Verify file has correct checksum + calcuatedsum = get_sha256sum(filepath) + if sha256sum != calcuatedsum: + raise Exception("Checksums do not match: " f"{sha256sum} != {calcuatedsum}") + + # Untar the file into the temporary directory + with tarfile.open(filepath) as tar: + tar.extractall(path=tmpdir) + + +def ostree_pull_local(srcrepo: str, dstrepo: str, branch: str, commit: str): + # verify the parent commit of the new commit is in the destination repo + # and also that the current branch in the repo points to it + branch_exists = ostree_branch_exists(repo=dstrepo, branch=branch) + parent = ostree_get_parent_commit(repo=srcrepo, commit=commit) + if branch_exists: + assert_branch_points_to_commit(repo=dstrepo, branch=branch, commit=parent) + # pull content + logger.info("Running ostree pull-local to perform import") + cmd = ["ostree", f"--repo={dstrepo}", "pull-local", srcrepo, commit] + runcmd(cmd) + # update branch + if branch_exists: + cmd = ["ostree", f"--repo={dstrepo}", "reset", branch, commit] + else: + cmd = ["ostree", f"--repo={dstrepo}", "refs", f"--create={branch}", commit] + logger.info(f"Updating branch {branch} -> {commit} in {dstrepo}") + runcmd(cmd) + # update summary file + logger.info("Updating summary file") + cmd = ["ostree", f"--repo={dstrepo}", "summary", "-u"] + runcmd(cmd) + + +def ostree_repo_exists(repo: str) -> bool: + if not os.path.exists(repo): + return False + cmd = ["ostree", f"--repo={repo}", "refs"] + if runcmd(cmd, check=False).returncode != 0: + logger.debug(f"OSTree repo does not exist at {repo}") + return False + return True + + +def ostree_commit_exists(repo: str, commit: str) -> bool: + cmd = ["ostree", f"--repo={repo}", "show", commit] + return runcmd(cmd, check=False).returncode == 0 + + +def ostree_branch_exists(repo: str, branch: str) -> bool: + cmd = ["ostree", f"--repo={repo}", "rev-parse", branch] + return runcmd(cmd, check=False).returncode == 0 + + +def ostree_get_parent_commit(repo: str, commit: str) -> str: + cmd = ["ostree", f"--repo={repo}", "rev-parse", f"{commit}^"] + return runcmd(cmd, check=True).stdout.strip().decode() + + +def assert_branch_points_to_commit(repo: str, branch: str, commit: str): + cmd = ["ostree", f"--repo={repo}", "rev-parse", branch] + cp = runcmd(cmd, check=True) + detected = cp.stdout.strip().decode() + logger.debug(f"{branch} points to {detected}") + if commit != detected: + raise Exception(f"{branch} points to {detected}. Expected {commit}") + + +def assert_commit_has_version(repo: str, commit: str, version: str): + cmd = ["ostree", f"--repo={repo}", "show", commit, "--print-metadata-key=version"] + cp = runcmd(cmd, check=True) + embeddedversion = cp.stdout.replace(b"'", b"").strip().decode() + if version != embeddedversion: + raise Exception( + "Embedded commit version does not match buildid " + f"{version} != {embeddedversion}" + ) + + +# The code in this file is expected to be run through fedora messaging +# However, you can run the script directly for testing purposes. The +# below code allows us to do that and also fake feeding data to the +# call by updating the json text below. +if __name__ == "__main__": + sh = logging.StreamHandler() + sh.setFormatter( + logging.Formatter("%(asctime)s %(levelname)s %(name)s - %(message)s") + ) + logger.addHandler(sh) + + m = fedora_messaging.api.Message( + topic="org.fedoraproject.prod.coreos.build.request.ostree-import", + body=EXAMPLE_MESSAGE_BODY, + ) + c = Consumer() + c.__call__(m) diff --git a/coreos-ostree-importer/fedora-messaging-config.toml b/coreos-ostree-importer/fedora-messaging-config.toml new file mode 100644 index 0000000..87b6294 --- /dev/null +++ b/coreos-ostree-importer/fedora-messaging-config.toml @@ -0,0 +1,83 @@ +# This file is in the TOML format. +amqp_url = "amqps://coreos:@rabbitmq.fedoraproject.org/%2Fpubsub" +callback = "fedora_messaging.example:printer" + +# pick up the key/cert from the same directory as the messaging config +[tls] +ca_cert = "/etc/fedora-messaging/cacert.pem" +keyfile = "coreos.key" +certfile = "coreos.crt" + +# Set the Application name/url/email +[client_properties] +app = "CoreOS OSTree Importer" +app_url = "https://github.com/coreos/fedora-coreos-releng-automation/tree/master/coreos-ostree-importer" +app_contacts_email = ["coreos@lists.fedoraproject.org"] + +[exchanges."amq.topic"] +type = "topic" +durable = true +auto_delete = false +arguments = {} + +# We'll use the coreos queue name +[queues.coreos] +durable = true +auto_delete = false +exclusive = true +arguments = {} + +# We care about the ostree-import message topic +[[bindings]] +queue = "coreos" +exchange = "amq.topic" +routing_keys = ["org.fedoraproject.prod.coreos.build.request.ostree-import"] + +[consumer_config] +example_key = "for my consumer" + +[qos] +prefetch_size = 0 +prefetch_count = 25 + +[log_config] +version = 1 +disable_existing_loggers = true + +# Adjust the log formatting based on preference +[log_config.formatters.simple] +format = "%(asctime)s %(levelname)s %(name)s - %(message)s" + +[log_config.handlers.console] +class = "logging.StreamHandler" +formatter = "simple" +stream = "ext://sys.stdout" + +# Set level to WARNING, otherwise too chatty +[log_config.loggers.fedora_messaging] +level = "WARNING" +propagate = false +handlers = ["console"] + +# Set level to WARNING, otherwise too chatty +[log_config.loggers.twisted] +level = "WARNING" +propagate = false +handlers = ["console"] + +[log_config.loggers.pika] +level = "WARNING" +propagate = false +handlers = ["console"] + +# If your consumer sets up a logger, you must add a configuration for it +# here in order for the messages to show up. e.g. if it set up a logger +# called 'example_printer', you could do: +#[log_config.loggers.example_printer] +#level = "INFO" +#propagate = false +#handlers = ["console"] + +[log_config.root] +level = "ERROR" +handlers = ["console"] diff --git a/coreos-ostree-importer/fedora-messaging-config.toml.orig b/coreos-ostree-importer/fedora-messaging-config.toml.orig new file mode 100644 index 0000000..2f8ff0a --- /dev/null +++ b/coreos-ostree-importer/fedora-messaging-config.toml.orig @@ -0,0 +1,91 @@ +# A basic configuration for Fedora's message broker, using the example callback +# which simply prints messages to standard output. +# +# This file is in the TOML format. +amqp_url = "amqps://fedora:@rabbitmq.fedoraproject.org/%2Fpublic_pubsub" +callback = "fedora_messaging.example:printer" + +[tls] +ca_cert = "/etc/fedora-messaging/cacert.pem" +keyfile = "/etc/fedora-messaging/fedora-key.pem" +certfile = "/etc/fedora-messaging/fedora-cert.pem" + +[client_properties] +app = "Example Application" +# Some suggested extra fields: +# URL of the project that provides this consumer +app_url = "https://github.com/fedora-infra/fedora-messaging" +# Contact emails for the maintainer(s) of the consumer - in case the +# broker admin needs to contact them, for e.g. +app_contacts_email = ["jcline@fedoraproject.org"] + +[exchanges."amq.topic"] +type = "topic" +durable = true +auto_delete = false +arguments = {} + +# Queue names *must* be in the normal UUID format: run "uuidgen" and use the +# output as your queue name. If your queue is not exclusive, anyone can connect +# and consume from it, causing you to miss messages, so do not share your queue +# name. Any queues that are not auto-deleted on disconnect are garbage-collected +# after approximately one hour. +# +# If you require a stronger guarantee about delivery, please talk to Fedora's +# Infrastructure team. +[queues.00000000-0000-0000-0000-000000000000] +durable = false +auto_delete = true +exclusive = true +arguments = {} + +[[bindings]] +queue = "00000000-0000-0000-0000-000000000000" +exchange = "amq.topic" +routing_keys = ["#"] # Set this to the specific topics you are interested in. + +[consumer_config] +example_key = "for my consumer" + +[qos] +prefetch_size = 0 +prefetch_count = 25 + +[log_config] +version = 1 +disable_existing_loggers = true + +[log_config.formatters.simple] +format = "[%(levelname)s %(name)s] %(message)s" + +[log_config.handlers.console] +class = "logging.StreamHandler" +formatter = "simple" +stream = "ext://sys.stdout" + +[log_config.loggers.fedora_messaging] +level = "INFO" +propagate = false +handlers = ["console"] + +[log_config.loggers.twisted] +level = "INFO" +propagate = false +handlers = ["console"] + +[log_config.loggers.pika] +level = "WARNING" +propagate = false +handlers = ["console"] + +# If your consumer sets up a logger, you must add a configuration for it +# here in order for the messages to show up. e.g. if it set up a logger +# called 'example_printer', you could do: +#[log_config.loggers.example_printer] +#level = "INFO" +#propagate = false +#handlers = ["console"] + +[log_config.root] +level = "ERROR" +handlers = ["console"]