Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refined functions to get packages indices #28

Merged
merged 5 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 27 additions & 22 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,21 @@ def __init__(self, *args): # noqa: D107

self._stored.set_default(config={})

@property
def base_path(self) -> Path:
"""Return base-path."""
return Path(self._stored.config["base-path"])

def _on_publish_relation_joined(self, event):
publish_path = "{}/publish".format(self._stored.config["base-path"])
event.relation.data[self.model.unit].update({"path": publish_path})
publish_path = self.base_path / "publish"
event.relation.data[self.model.unit].update({"path": str(publish_path)})

def _update_status(self):
published_snapshot = self._get_published_snapshot()
if published_snapshot:
self.model.unit.status = ActiveStatus("Publishes: {}".format(published_snapshot))
else:
path = self._stored.config["base-path"] + "/mirror"
path = self.base_path / "mirror"
if os.path.isdir(path):
stat = os.stat(path)
self.model.unit.status = BlockedStatus(
Expand Down Expand Up @@ -149,7 +154,7 @@ def _check_packages(self):
# and should not be removed.
indexed_packages = set()
snapshot_paths = self._list_snapshots()
mirror_path = "{}/mirror".format(self._stored.config["base-path"])
mirror_path = self.base_path / "mirror"
for path in snapshot_paths + [mirror_path]:
for base, indices in locate_package_indices(path):
indexed_packages |= set(find_packages_by_indices(indices, base=base))
Expand Down Expand Up @@ -249,7 +254,7 @@ def _on_synchronize_action(self, event):
try:
if mirror_filter is None:
# clean dists only if no filter was applied
clean_dists(Path(self._stored.config["base-path"]))
clean_dists(self.base_path)

logger.info("running apt-mirror for:%s", os.linesep + os.linesep.join(mirrors))
tmp_path = self._create_tmp_apt_mirror_config(*mirrors)
Expand Down Expand Up @@ -278,30 +283,30 @@ def _on_synchronize_action(self, event):
def _on_create_snapshot_action(self, event):
snapshot_name = self._get_snapshot_name()
logger.info("Create snapshot {}".format(snapshot_name))
snapshot_name_path = "{}/{}".format(self._stored.config["base-path"], snapshot_name)
mirror_path = "{}/mirror".format(self._stored.config["base-path"])
snapshot_name_path = self.base_path / snapshot_name
mirror_path = self.base_path / "mirror"
mirrors = self._mirror_names()
if not os.path.exists(snapshot_name_path):
os.makedirs(snapshot_name_path)
for dirpath, dirs, files in os.walk(mirror_path):
if "pool" in dirs:
src_root = dirpath
src_pool = "{}/pool".format(src_root)
src_root = Path(dirpath)
src_pool = src_root / "pool"
subtree = self._build_subtree(mirrors, src_root, mirror_path)
dst_root = "{}/{}".format(snapshot_name_path, subtree)
dst_pool = "{}/pool".format(dst_root)
dst_root = snapshot_name_path / subtree
dst_pool = dst_root / "pool"
os.makedirs(dst_root, exist_ok=True)
os.symlink(src_pool, dst_pool)
logger.info("{} -> {}".format(src_pool, dst_pool))
logger.info("%s -> %s", src_pool, dst_pool)
if "dists" in dirs:
src_root = dirpath
src_dists = "{}/dists".format(src_root)
src_root = Path(dirpath)
src_dists = src_root / "dists"
subtree = self._build_subtree(mirrors, src_root, mirror_path)
dst_root = "{}/{}".format(snapshot_name_path, subtree)
dst_dists = "{}/dists".format(dst_root)
dst_root = snapshot_name_path / subtree
dst_dists = dst_root / "dists"
os.makedirs(dst_root, exist_ok=True)
shutil.copytree(src_dists, dst_dists)
logger.info("{} -> {}".format(src_dists, dst_dists))
logger.info("%s -> %s", src_dists, dst_dists)
self._update_status()

def _on_delete_snapshot_action(self, event):
Expand All @@ -310,11 +315,11 @@ def _on_delete_snapshot_action(self, event):
event.fail("Invalid snapshot name: {}".format(snapshot))
return
logger.info("Delete snapshot {}".format(snapshot))
shutil.rmtree("{}/{}".format(self._stored.config["base-path"], snapshot))
shutil.rmtree(self.base_path / snapshot)
self._update_status()

def _list_snapshots(self):
return list(Path(self._stored.config["base-path"]).glob("snapshot-*"))
return list(self.base_path.glob("snapshot-*"))

def _on_list_snapshots_action(self, event):
snapshots = [p.name for p in self._list_snapshots()]
Expand All @@ -324,8 +329,8 @@ def _on_list_snapshots_action(self, event):
def _on_publish_snapshot_action(self, event):
name = event.params["name"]
logger.info("Publish snapshot {}".format(name))
snapshot_path = "{}/{}".format(self._stored.config["base-path"], name)
publish_path = "{}/publish".format(self._stored.config["base-path"])
snapshot_path = self.base_path / name
publish_path = self.base_path / "publish"
if not os.path.isdir(snapshot_path):
event.fail("Snapshot does not exist")
return
Expand Down Expand Up @@ -376,7 +381,7 @@ def _mirror_names(self):
]

def _get_published_snapshot(self):
publish_path = "{}/publish".format(self._stored.config["base-path"])
publish_path = self.base_path / "publish"
if os.path.islink(publish_path):
return os.path.basename(os.readlink(publish_path))

Expand Down
58 changes: 41 additions & 17 deletions src/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@

import logging
import shutil
from bz2 import open as bopen
from gzip import open as gopen
from lzma import open as lopen
from os import readlink
from pathlib import Path
from typing import Set
from typing import List, Set

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -58,6 +54,28 @@ def find_packages_by_indices(indices, base=""):
return packages


def _get_archive_root(pool: Path) -> Path:
"""Get archive path from pool path."""
if pool.is_symlink():
# Note(rgildein): using pool.resolve instead of pool.readlink, since readlink was
# introduced in Python 3.9, we know that path exists, so it's safe
return pool.resolve().parent.absolute()

return pool.parent.absolute()


def _get_package_indices(dists: Path) -> List[Path]:
"""Get package indices."""
indices, parents = [], []
# filter files from same parent directory
for path in sorted(dists.glob("**/Packages*")):
if path.parent not in parents:
indices.append(path)
parents.append(path.parent)

return indices


def locate_package_indices(path):
"""Return all locations of the index files.

Expand All @@ -74,16 +92,13 @@ def locate_package_indices(path):
if not path.exists():
raise FileNotFoundError("Invalid path: {}".format(path))

dists = list(path.glob("**/dists"))
pool = [d.parent / "pool" for d in dists]

archive_roots = []
package_indices = []
for p, d in zip(pool, dists):
archive_roots.append(
Path(readlink(p)).parent.absolute() if p.is_symlink() else p.parent.absolute()
)
package_indices.append([sorted(distro.glob("**/Packages*"))[0] for distro in d.glob("*")])
for dists in path.glob("**/dists"):
pool = dists.parent / "pool"
archive_roots.append(_get_archive_root(pool))
package_indices.append(_get_package_indices(dists))

return zip(archive_roots, package_indices)


Expand All @@ -98,7 +113,10 @@ def convert_bytes(num):
def _locate_packages_from_index(package_index, archive_root=""):
"""Return an unique set of path-to-packages found in the index file."""
opener = _get_opener(package_index)
with opener(package_index, mode="r") as f:

# Note (rgildein): We need to use `rt` directly, because `r` for gzip is referring
# to `rb` instead of `rt`.
with opener(package_index, mode="rt") as f:
packages = {
Path(archive_root, line.strip().replace("Filename: ", ""))
for line in f
Expand All @@ -111,10 +129,16 @@ def _get_opener(path):
"""Return appropriate opener to open an index file."""
extension = Path(path).suffix
if extension == ".gz":
return gopen
import gzip

return gzip.open
elif extension == ".bz2":
return bopen
import bz2

return bz2.open
elif extension == ".lzma" or extension == ".xz":
return lopen
import lzma

return lzma.open
else:
return open
Loading