Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Display Deploy progress #40

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
26 changes: 24 additions & 2 deletions Golem.Tools/App/app.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import asyncio
import os

from alive_progress import alive_bar
from dataclasses import dataclass
from datetime import datetime

from yapapi import Golem
from yapapi.payload import Payload
from yapapi.props import inf
from yapapi.props.base import constraint, prop
from yapapi.script import ProgressArgs
from yapapi.services import Service
from yapapi.log import enable_default_logger

import json
import argparse
import asyncio
import tempfile
Expand Down Expand Up @@ -170,7 +173,7 @@ async def start(self):
self.strategy.remember(self._ctx.provider_id)

script = self._ctx.new_script(timeout=None)
script.deploy()
script.deploy(progress_args=ProgressArgs(updateInterval="500ms"))
script.start()
yield script

Expand All @@ -182,6 +185,19 @@ def __init__(self, strategy: ProviderOnceStrategy):
self.strategy = strategy


def progress_event_handler(event: "yapapi.events.CommandProgress"):
if event.progress is not None and event.progress[1] is not None:
progress = event.progress
percent = 0.0

if progress[1] is None:
percent = "unknown"
else:
percent = 100.0 * progress[0] / progress[1]

print(f"Deploy progress: {percent}% ({progress[0]} B / {progress[1]} B)")


async def main(subnet_tag, driver=None, network=None, runtime="dummy"):
strategy = ProviderOnceStrategy()
async with Golem(
Expand All @@ -190,7 +206,8 @@ async def main(subnet_tag, driver=None, network=None, runtime="dummy"):
strategy=strategy,
payment_driver=driver,
payment_network=network,
) as golem:
stream_output=True,
) as golem:
AiRuntimeService.runtime = runtime
cluster = await golem.run_service(
AiRuntimeService,
Expand All @@ -200,6 +217,11 @@ async def main(subnet_tag, driver=None, network=None, runtime="dummy"):
num_instances=1,
)

golem.add_event_consumer(progress_event_handler, ["CommandProgress"])

def instances():
return [f"{s.provider_name}: {s.state.value}" for s in cluster.instances]

async def print_usage():
token = golem._engine._api_config.app_key

Expand Down
Binary file modified Golem.Tools/App/requirements.txt
Binary file not shown.
11 changes: 6 additions & 5 deletions Golem.Tools/GolemPackageBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ namespace Golem.Tools
{
public class PackageBuilder
{
public static string CURRENT_GOLEM_VERSION = "pre-rel-v0.14.1-rc8_shutdown";
public static string CURRENT_RUNTIME_VERSION = "pre-rel-v0.1.0-rc26_automatic_shutdown";
public static string CURRENT_GOLEM_VERSION = "pre-rel-v0.15.0-ai-rc1";
public static string CURRENT_RUNTIME_VERSION = "pre-rel-v0.1.0-rc30";

internal static string InitTestDirectory(string name, bool cleanupData = true)
{
Expand Down Expand Up @@ -61,7 +61,7 @@ public async static Task<string> BuildTestDirectory(string test_name)
var system = System();
BuildDirectoryStructure(dir);

await DownloadExtractPackage(BinariesDir(dir), "golem-provider", "pwalski/yagna", CURRENT_GOLEM_VERSION);
await DownloadExtractPackage(BinariesDir(dir), "golem-provider", "golemfactory/yagna", CURRENT_GOLEM_VERSION);

var exeUnitDir = ExeUnitsDir(dir);
await DownloadExtractPackage(exeUnitDir, "runtime", "golemfactory/ya-runtime-ai", CURRENT_RUNTIME_VERSION);
Expand Down Expand Up @@ -105,7 +105,7 @@ public async static Task<string> BuildRequestorDirectoryRelative(string dir, boo
Directory.CreateDirectory(YagnaDataDir(dir));

if (!File.Exists(Path.Combine(BinariesDir(dir), "yagna")))
await DownloadExtractPackage(BinariesDir(dir), "golem-requestor", "pwalski/yagna", CURRENT_GOLEM_VERSION);
await DownloadExtractPackage(BinariesDir(dir), "golem-requestor", "golemfactory/yagna", CURRENT_GOLEM_VERSION);

return dir;
}
Expand All @@ -115,7 +115,8 @@ public static async Task DownloadExtractPackage(string dir, string artifact, str
var downloaded_artifact = await DownloadArchiveArtifact(artifact, tag, repo);

var extract_dir = Path.Combine(dir, "unpack");
if (Path.Exists(extract_dir)) {
if (Path.Exists(extract_dir))
{
Directory.Delete(extract_dir, true);
Directory.CreateDirectory(extract_dir);
}
Expand Down
3 changes: 1 addition & 2 deletions Golem.Tools/SampleApp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ public override bool Start()
{
var working_dir = Path.Combine(_dir, "modules", "golem-data", "app");
Directory.CreateDirectory(working_dir);

return StartProcess("app", working_dir, $"--network {Network.Goerli.Id} --driver {PaymentDriver.ERC20next.Id} --subnet-tag public {_extraArgs}", _env, true);
return StartProcess("app", working_dir, $"--network {Network.Goerli.Id} --driver {PaymentDriver.ERC20.Id} --subnet-tag public", _env, true);
}
}

Expand Down
7 changes: 5 additions & 2 deletions Golem/Golem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
using System.Net.Http.Headers;
using System.Runtime.CompilerServices;
using System.Text.Json;

using Golem.GolemUI.Src;
using Golem.Tools;
using Golem.Yagna;
using Golem.Yagna.Types;

using GolemLib;
using GolemLib.Types;

using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

Expand Down Expand Up @@ -265,8 +268,8 @@ private async Task<bool> StartupYagnaAsync(YagnaStartupOptions yagnaOptions, Act

try
{
_logger.LogInformation("Init Payment {} {} {}",yagnaOptions.Network, PaymentDriver.ERC20next.Id, account);
Yagna.PaymentService.Init(yagnaOptions.Network, PaymentDriver.ERC20next.Id, account ?? "");
_logger.LogInformation("Init Payment {} {} {}", yagnaOptions.Network, PaymentDriver.ERC20.Id, account);
Yagna.PaymentService.Init(yagnaOptions.Network, PaymentDriver.ERC20.Id, account ?? "");
}
catch (Exception e)
{
Expand Down
91 changes: 88 additions & 3 deletions example/ai-requestor/ai_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@
from PIL import Image
import requests

from alive_progress import alive_bar
from dataclasses import dataclass
from datetime import datetime

import yapapi.script.command
from yapapi import Golem
from yapapi.payload import Payload
from yapapi.props import inf
from yapapi.props.base import constraint, prop
from yapapi.services import Service
from yapapi.log import enable_default_logger
from yapapi.config import ApiConfig
from yapapi.script import ProgressArgs

import argparse
import asyncio
Expand Down Expand Up @@ -85,6 +88,69 @@ def print_env_info(golem: Golem):
)


def command_key(event: "yapapi.events.CommandProgress") -> str:
return f"{event.script_id}#{event.command._index}"


class ProgressDisplayer:
def __init__(self):
self._transfers_bars = {}
self._transfers_ctx = {}

def exit(self):
for key, bar in self._transfers_ctx:
bar.__exit__(None, None, None)

def progress_bar(self, event: "yapapi.events.CommandProgress"):
if event.message is not None:
print(f"{event.message}")

if event.progress is not None and event.progress[1] is not None:
progress = event.progress
key = command_key(event)

if self._transfers_ctx.get(key) is None:
self.create_progress_bar(event)

bar = self._transfers_ctx.get(key)
bar(progress[0] / progress[1])

def create_progress_bar(self, event: "yapapi.events.CommandProgress"):
key = command_key(event)
bar = alive_bar(
total=event.progress[1],
manual=True,
title="Progress",
unit=event.unit,
scale=True,
dual_line=True,
)
bar_ctx = bar.__enter__()

command = event.command
if isinstance(command, yapapi.script.command.Deploy):
bar_ctx.text = "Deploying image"
elif isinstance(command, yapapi.script.command._SendContent):
bar_ctx.text = f"Uploading file: {command._src.download_url} -> {command._dst_path}"
elif isinstance(command, yapapi.script.command._ReceiveContent):
bar_ctx.text = f"Downloading file: {command._src_path} -> {command._dst_path}"

self._transfers_bars[key] = bar
self._transfers_ctx[key] = bar_ctx

def executed(self, event: "yapapi.events.CommandExecuted"):
key = command_key(event)
if self._transfers_ctx.get(key) is not None:
bar_obj = self._transfers_bars.get(key)
bar = self._transfers_ctx.get(key)

bar(1.0)
bar_obj.__exit__(None, None, None)

self._transfers_bars.pop(key)
self._transfers_ctx.pop(key)


def run_golem_example(example_main, log_file=None):
# This is only required when running on Windows with Python prior to 3.8:
windows_event_loop_fix()
Expand Down Expand Up @@ -152,7 +218,7 @@ def remember(self, provider_id: str):
# App

RUNTIME_NAME = "automatic"
# RUNTIME_NAME = "dummy"
#RUNTIME_NAME = "dummy"

@dataclass
class AiPayload(Payload):
Expand All @@ -168,12 +234,14 @@ async def get_payload():
## TODO switched into using smaller model to avoid problems during tests. Resolve it when automatic runtime integrated
# return AiPayload(image_url="hash:sha3:92180a67d096be309c5e6a7146d89aac4ef900e2bf48a52ea569df7d:https://huggingface.co/stabilityai/stable-diffusion-xl-base-1.0/resolve/main/sd_xl_base_1.0.safetensors?download=true")
# return AiPayload(image_url="hash:sha3:0b682cf78786b04dc108ff0b254db1511ef820105129ad021d2e123a7b975e7c:https://huggingface.co/cointegrated/rubert-tiny2/resolve/main/model.safetensors?download=true")
return AiPayload(image_url="hash:sha3:6ce0161689b3853acaa03779ec93eafe75a02f4ced659bee03f50797806fa2fa:https://huggingface.co/runwayml/stable-diffusion-v1-5/resolve/main/v1-5-pruned-emaonly.safetensors?download=true")
return AiPayload(image_url="hash:sha3:b2da48d618beddab1887739d75b50a3041c810bc73805a416761185998359b24:https://huggingface.co/runwayml/stable-diffusion-v1-5/resolve/main/v1-5-pruned-emaonly.safetensors?download=true")


async def start(self):
self.strategy.remember(self._ctx.provider_id)

script = self._ctx.new_script(timeout=None)
script.deploy()
script.deploy(progress_args=ProgressArgs(updateInterval="500ms"))
script.start()
yield script

Expand All @@ -184,6 +252,7 @@ def __init__(self, strategy: ProviderOnceStrategy):
super().__init__()
self.strategy = strategy


async def trigger(activity: RequestorControlApi, token, prompt, output_file):

custom_url = "/sdapi/v1/txt2img"
Expand Down Expand Up @@ -219,6 +288,7 @@ async def main(subnet_tag, driver=None, network=None):
strategy=strategy,
payment_driver=driver,
payment_network=network,
stream_output=True,
) as golem:
cluster = await golem.run_service(
AiRuntimeService,
Expand All @@ -228,6 +298,21 @@ async def main(subnet_tag, driver=None, network=None):
num_instances=1,
)

bar = ProgressDisplayer()
def progress_event_handler(event: "yapapi.events.CommandProgress"):
bar.progress_bar(event)

def on_shutdown(_event: "yapapi.events.ServiceFinished"):
bar.exit()

def on_command_executed(event: "yapapi.events.CommandExecuted"):
bar.executed(event)

golem.add_event_consumer(progress_event_handler, ["CommandProgress"])
golem.add_event_consumer(on_shutdown, ["ServiceFinished"])
golem.add_event_consumer(on_command_executed, ["CommandExecuted"])


def instances():
return [
{
Expand Down
Loading
Loading