Skip to content

Commit

Permalink
Use chili lib to parse config
Browse files Browse the repository at this point in the history
  • Loading branch information
githuib committed Feb 17, 2024
1 parent ac02685 commit df21344
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 28 deletions.
28 changes: 27 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "powerchord"
version = "0.0.6"
version = "0.0.7"
description = "Concurrent CLI task runner"
authors = ["Huib Piguillet <[email protected]>"]
maintainers = ["Huib Piguillet"]
Expand All @@ -21,6 +21,7 @@ powerchord = "powerchord.runner:run_tasks"

[tool.poetry.dependencies]
python = ">=3.11,<4.0"
chili = "*"

[tool.poetry.group.qa.dependencies]
mypy = "*"
Expand Down
55 changes: 31 additions & 24 deletions src/powerchord/runner.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import asyncio
import sys
import tomllib
from collections.abc import Sequence
from dataclasses import dataclass, field
from enum import StrEnum
from enum import Enum
from pathlib import Path

from chili.decoder import decode

from .formatting import bright, dim, status
from .utils import exec_command, timed_awaitable
from .utils import concurrent_call, exec_command, timed_awaitable


class BoredomError(Exception):
Expand All @@ -20,61 +21,67 @@ class ConfigError(Exception):
message: str


class Output(StrEnum):
class Output(Enum):
OUT = 'info'
ERR = 'error'


@dataclass
class Verbosity:
success: Sequence[Output] = field(default_factory=list)
fail: Sequence[Output] = field(default_factory=lambda: [Output.OUT, Output.ERR])
success: list[Output] = field(default_factory=list)
fail: list[Output] = field(default_factory=lambda: [Output.OUT, Output.ERR])

def should_output(self, out: Output, success: bool):
return (out in self.success) if success else (out in self.fail)


class TaskRunner:
def __init__(self, tasks: dict[str, str], verbosity: Verbosity):
if not tasks:
@dataclass
class Config:
tasks: dict[str, str] = field(default_factory=dict)
verbosity: Verbosity = field(default_factory=lambda: Verbosity())

def __post_init__(self):
if not self.tasks:
raise BoredomError
self.tasks = tasks
self.verbosity = verbosity
self.max_name_length = max(len(n) for n in tasks)


class TaskRunner:
def __init__(self, config: Config) -> None:
self.config = config
self.max_name_length = max(len(n) for n in config.tasks)

@classmethod
def with_pyproject_config(cls) -> 'TaskRunner':
pyproject_file = Path('pyproject.toml')
try:
with pyproject_file.open('rb') as f:
config = tomllib.load(f).get('tool', {}).get('powerchord', {})
config_dict = tomllib.load(f).get('tool', {}).get('powerchord', {})
except OSError as exc:
raise ConfigError(pyproject_file, str(exc)) from exc
if not config:
raise ConfigError(pyproject_file, 'Could not find any [tool.powerchord(.*)] entries')
return cls(config.get('tasks', {}), Verbosity(**config.get('verbosity', {})))
try:
config = decode(config_dict, Config)
except ValueError as exc:
raise ConfigError(pyproject_file, str(exc)) from exc
return cls(config)

async def run_task(self, name: str, task: str) -> tuple[str, bool]:
(success, out, err), duration = await timed_awaitable(exec_command(task))
sys.stdout.write(f'{status(success)} {name.ljust(self.max_name_length)} {dim(duration)}\n')
if self.verbosity.should_output(Output.OUT, success):
if self.config.verbosity.should_output(Output.OUT, success):
sys.stdout.buffer.write(out)
sys.stdout.buffer.flush()
if self.verbosity.should_output(Output.ERR, success):
if self.config.verbosity.should_output(Output.ERR, success):
sys.stderr.buffer.write(err)
sys.stderr.buffer.flush()
return name, success

async def run_tasks(self) -> list[tuple[str, bool]]:
tasks = self.config.tasks.items()
sys.stdout.write(bright('To do:\n'))
for name, task in self.tasks.items():
for name, task in tasks:
sys.stdout.write(f'• {name.ljust(self.max_name_length)} {dim(task)}\n')
sys.stdout.write(bright('\nResults:\n'))
futures = [
asyncio.create_task(self.run_task(name, task))
for name, task in self.tasks.items()
]
return [await f for f in futures]
return await concurrent_call(self.run_task, tasks)


def fail_with(*lines: str) -> None:
Expand Down
26 changes: 24 additions & 2 deletions src/powerchord/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import asyncio
from collections.abc import Awaitable, Callable
from collections.abc import AsyncIterator, Awaitable, Callable, Coroutine, Iterable
from subprocess import PIPE
from time import perf_counter_ns
from typing import TypeVar
from typing import Any, ParamSpec, TypeVar

P = ParamSpec('P')
T = TypeVar('T')


Expand All @@ -13,6 +14,27 @@ async def exec_command(command: str) -> tuple[bool, bytes, bytes]:
return proc.returncode == 0, out, err


async def concurrent_iter(
coros: Iterable[Coroutine[Any, Any, T]],
) -> AsyncIterator[T]:
tasks: list[asyncio.Task[T]] = [asyncio.create_task(coro) for coro in coros]
for task in tasks:
yield await task


async def concurrent_list(
coros: Iterable[Coroutine[Any, Any, T]],
) -> list[T]:
return [item async for item in concurrent_iter(coros)]


async def concurrent_call(
async_func: Callable[P, Coroutine[Any, Any, T]],
args_list: Iterable[P.args],
) -> list[T]:
return await concurrent_list(async_func(*args) for args in args_list)


def human_readable_duration(nanoseconds: int) -> str:
minutes = int(nanoseconds // 60_000_000_000)
nanoseconds %= 60_000_000_000
Expand Down

0 comments on commit df21344

Please sign in to comment.