diff --git a/tools/who_what_benchmark/tests/test_cli_image.py b/tools/who_what_benchmark/tests/test_cli_image.py index b2c2015f80..536d015612 100644 --- a/tools/who_what_benchmark/tests/test_cli_image.py +++ b/tools/who_what_benchmark/tests/test_cli_image.py @@ -20,6 +20,8 @@ def run_wwb(args): @pytest.mark.parametrize( ("model_id", "model_type", "backend"), [ + ("hf-internal-testing/tiny-stable-diffusion-torch", "image-to-image", "hf"), + ("hf-internal-testing/tiny-stable-diffusion-xl-pipe", "image-to-image", "hf"), ("hf-internal-testing/tiny-stable-diffusion-torch", "text-to-image", "hf"), ("hf-internal-testing/tiny-stable-diffusion-torch", "text-to-image", "openvino"), ("hf-internal-testing/tiny-stable-diffusion-xl-pipe", "text-to-image", "hf"), @@ -40,6 +42,8 @@ def test_image_model_types(model_id, model_type, backend): "CPU", "--model-type", model_type, + "--num-inference-steps", + "2", ] if backend == "hf": wwb_args.append("--hf") @@ -65,7 +69,8 @@ def test_image_model_types(model_id, model_type, backend): @pytest.mark.parametrize( ("model_id", "model_type"), [ - ("echarlaix/tiny-random-stable-diffusion-xl", "text-to-image"), + ("OpenVINO/LCM_Dreamshaper_v7-int8-ov", "image-to-image"), + ("OpenVINO/LCM_Dreamshaper_v7-int8-ov", "text-to-image"), ], ) def test_image_model_genai(model_id, model_type): @@ -73,15 +78,15 @@ def test_image_model_genai(model_id, model_type): GT_FILE = os.path.join(temp_dir, "gt.csv") MODEL_PATH = os.path.join(temp_dir, model_id.replace("/", "--")) - result = subprocess.run(["optimum-cli", "export", - "openvino", "-m", model_id, + result = subprocess.run(["huggingface-cli", "download", + model_id, "--local-dir", MODEL_PATH], capture_output=True, text=True) assert result.returncode == 0 wwb_args = [ "--base-model", - MODEL_PATH, + model_id, "--num-samples", "1", "--gt-data", @@ -90,6 +95,8 @@ def test_image_model_genai(model_id, model_type): "CPU", "--model-type", model_type, + "--num-inference-steps", + "2", ] result = run_wwb(wwb_args) assert result.returncode == 0 @@ -108,6 +115,8 @@ def test_image_model_genai(model_id, model_type): "--model-type", model_type, "--genai", + "--num-inference-steps", + "2", ] result = run_wwb(wwb_args) @@ -131,6 +140,9 @@ def test_image_model_genai(model_id, model_type): model_type, "--output", output_dir, + "--genai", + "--num-inference-steps", + "2", ] result = run_wwb(wwb_args) assert result.returncode == 0 @@ -149,6 +161,8 @@ def test_image_model_genai(model_id, model_type): "CPU", "--model-type", model_type, + "--num-inference-steps", + "2", ] result = run_wwb(wwb_args) assert result.returncode == 0 @@ -182,6 +196,8 @@ def test_image_custom_dataset(model_id, model_type, backend): "google-research-datasets/conceptual_captions", "--dataset-field", "caption", + "--num-inference-steps", + "2", ] if backend == "hf": wwb_args.append("--hf") diff --git a/tools/who_what_benchmark/whowhatbench/__init__.py b/tools/who_what_benchmark/whowhatbench/__init__.py index 278db2c6a1..f608601ec8 100644 --- a/tools/who_what_benchmark/whowhatbench/__init__.py +++ b/tools/who_what_benchmark/whowhatbench/__init__.py @@ -3,6 +3,7 @@ from .text_evaluator import TextEvaluator as Evaluator from .text2image_evaluator import Text2ImageEvaluator from .visualtext_evaluator import VisualTextEvaluator +from .image2image import Image2ImageEvaluator __all__ = [ @@ -11,5 +12,6 @@ "TextEvaluator", "Text2ImageEvaluator", "VisualTextEvaluator", + "Image2ImageEvaluator", "EVALUATOR_REGISTRY", ] diff --git a/tools/who_what_benchmark/whowhatbench/image2image.py b/tools/who_what_benchmark/whowhatbench/image2image.py new file mode 100644 index 0000000000..90eb6c7c87 --- /dev/null +++ b/tools/who_what_benchmark/whowhatbench/image2image.py @@ -0,0 +1,129 @@ +import os +from typing import Any, Union + +import datasets +import pandas as pd +from tqdm import tqdm +from transformers import set_seed +import torch +import openvino_genai + +from .registry import register_evaluator +from .text2image_evaluator import Text2ImageEvaluator + +from .whowhat_metrics import ImageSimilarity + + +def preprocess_fn(example): + return { + "prompts": example["Instruction_VLM-LLM"], + "images": example["source_img"], + } + + +def prepare_default_data(num_samples=None): + DATASET_NAME = "paint-by-inpaint/PIPE" + NUM_SAMPLES = 10 if num_samples is None else num_samples + set_seed(42) + default_dataset = datasets.load_dataset( + DATASET_NAME, split="test", streaming=True + ).filter(lambda example: example["Instruction_VLM-LLM"] != "").take(NUM_SAMPLES) + return default_dataset.map( + lambda x: preprocess_fn(x), remove_columns=default_dataset.column_names + ) + + +@register_evaluator("image-to-image") +class Image2ImageEvaluator(Text2ImageEvaluator): + def __init__( + self, + base_model: Any = None, + gt_data: str = None, + test_data: Union[str, list] = None, + metrics="similarity", + similarity_model_id: str = "openai/clip-vit-large-patch14", + num_inference_steps=4, + crop_prompts=True, + num_samples=None, + gen_image_fn=None, + seed=42, + is_genai=False, + ) -> None: + assert ( + base_model is not None or gt_data is not None + ), "Text generation pipeline for evaluation or ground trush data must be defined" + + self.test_data = test_data + self.metrics = metrics + self.crop_prompt = crop_prompts + self.num_samples = num_samples + self.num_inference_steps = num_inference_steps + self.seed = seed + self.similarity = None + self.similarity = ImageSimilarity(similarity_model_id) + self.last_cmp = None + self.gt_dir = os.path.dirname(gt_data) + self.generation_fn = gen_image_fn + self.is_genai = is_genai + self.resolution = None + + if base_model: + self.gt_data = self._generate_data( + base_model, gen_image_fn, os.path.join(self.gt_dir, "reference") + ) + else: + self.gt_data = pd.read_csv(gt_data, keep_default_na=False) + + def _generate_data(self, model, gen_image_fn=None, image_dir="reference"): + def default_gen_image_fn(model, prompt, image, num_inference_steps, generator=None): + with torch.no_grad(): + output = model( + prompt, + image=image, + num_inference_steps=num_inference_steps, + output_type="pil", + strength=0.8, + generator=generator, + ) + return output.images[0] + + generation_fn = gen_image_fn or default_gen_image_fn + + if self.test_data: + if isinstance(self.test_data, str): + data = pd.read_csv(self.test_data) + else: + if isinstance(self.test_data, dict): + assert "prompts" in self.test_data + assert "images" in self.test_data + data = dict(self.test_data) + data = pd.DataFrame.from_dict(data) + else: + data = pd.DataFrame.from_dict(prepare_default_data(self.num_samples)) + + prompts = data["prompts"] + images = data["images"] + output_images = [] + rng = torch.Generator(device="cpu") + + if not os.path.exists(image_dir): + os.makedirs(image_dir) + + for i, (prompt, image) in tqdm(enumerate(zip(prompts, images)), desc="Evaluate pipeline"): + set_seed(self.seed) + rng = rng.manual_seed(self.seed) + output = generation_fn( + model, + prompt, + image=image, + num_inference_steps=self.num_inference_steps, + generator=openvino_genai.TorchGenerator(self.seed) if self.is_genai else rng + ) + image_path = os.path.join(image_dir, f"{i}.png") + output.save(image_path) + output_images.append(image_path) + + res_data = {"prompts": list(prompts), "images": output_images} + df = pd.DataFrame(res_data) + + return df diff --git a/tools/who_what_benchmark/whowhatbench/model_loaders.py b/tools/who_what_benchmark/whowhatbench/model_loaders.py new file mode 100644 index 0000000000..f54d232bc2 --- /dev/null +++ b/tools/who_what_benchmark/whowhatbench/model_loaders.py @@ -0,0 +1,252 @@ +import logging +import json + +from transformers import AutoConfig, AutoModelForCausalLM, AutoModel, AutoModelForVision2Seq +from diffusers import DiffusionPipeline, AutoPipelineForImage2Image + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class GenAIModelWrapper: + """ + A helper class to store additional attributes for GenAI models + """ + + def __init__(self, model, model_dir, model_type): + self.model = model + self.model_type = model_type + + if model_type == "text" or model_type == "visual-text": + self.config = AutoConfig.from_pretrained(model_dir, trust_remote_code=True) + elif model_type == "text-to-image": + self.config = DiffusionPipeline.load_config( + model_dir, trust_remote_code=True) + + def __getattr__(self, attr): + if attr in self.__dict__: + return getattr(self, attr) + else: + return getattr(self.model, attr) + + +def load_text_genai_pipeline(model_dir, device="CPU", ov_config=None): + try: + import openvino_genai + except ImportError: + logger.error( + "Failed to import openvino_genai package. Please install it.") + exit(-1) + return GenAIModelWrapper(openvino_genai.LLMPipeline(model_dir, device=device, **ov_config), model_dir, "text") + + +def load_text_model( + model_id, device="CPU", ov_config=None, use_hf=False, use_genai=False +): + if use_hf: + logger.info("Using HF Transformers API") + model = AutoModelForCausalLM.from_pretrained( + model_id, trust_remote_code=True, device_map=device.lower() + ) + model.eval() + elif use_genai: + logger.info("Using OpenVINO GenAI API") + model = load_text_genai_pipeline(model_id, device, ov_config) + else: + logger.info("Using Optimum API") + from optimum.intel.openvino import OVModelForCausalLM + try: + model = OVModelForCausalLM.from_pretrained( + model_id, trust_remote_code=True, device=device, ov_config=ov_config + ) + except ValueError: + config = AutoConfig.from_pretrained( + model_id, trust_remote_code=True) + model = OVModelForCausalLM.from_pretrained( + model_id, + config=config, + trust_remote_code=True, + use_cache=True, + device=device, + ov_config=ov_config, + ) + + return model + + +def load_text2image_genai_pipeline(model_dir, device="CPU", ov_config=None): + try: + import openvino_genai + except ImportError: + logger.error( + "Failed to import openvino_genai package. Please install it.") + exit(-1) + + return GenAIModelWrapper( + openvino_genai.Text2ImagePipeline(model_dir, device=device, **ov_config), + model_dir, + "text-to-image" + ) + + +def load_text2image_model( + model_id, device="CPU", ov_config=None, use_hf=False, use_genai=False +): + if use_genai: + logger.info("Using OpenvINO GenAI API") + model = load_text2image_genai_pipeline(model_id, device, ov_config) + elif use_hf: + logger.info("Using HF Transformers API") + model = DiffusionPipeline.from_pretrained( + model_id, trust_remote_code=True) + else: + logger.info("Using Optimum API") + from optimum.intel import OVPipelineForText2Image + TEXT2IMAGEPipeline = OVPipelineForText2Image + + try: + model = TEXT2IMAGEPipeline.from_pretrained( + model_id, trust_remote_code=True, device=device, ov_config=ov_config + ) + except ValueError: + config = AutoConfig.from_pretrained( + model_id, trust_remote_code=True) + model = TEXT2IMAGEPipeline.from_pretrained( + model_id, + config=config, + trust_remote_code=True, + use_cache=True, + device=device, + ov_config=ov_config, + ) + + return model + + +def load_visual_text_genai_pipeline(model_dir, device="CPU", ov_config=None): + try: + import openvino_genai + except ImportError as e: + logger.error("Failed to import openvino_genai package. Please install it. Details:\n", e) + exit(-1) + + return GenAIModelWrapper( + openvino_genai.VLMPipeline(model_dir, device, **ov_config), + model_dir, + "visual-text" + ) + + +def load_visual_text_model( + model_id, device="CPU", ov_config=None, use_hf=False, use_genai=False +): + if use_hf: + logger.info("Using HF Transformers API") + config = AutoConfig.from_pretrained(model_id, trust_remote_code=True) + try: + model = AutoModelForVision2Seq.from_pretrained( + model_id, trust_remote_code=True, device_map=device.lower() + ) + except ValueError: + try: + model = AutoModel.from_pretrained( + model_id, trust_remote_code=True, device_map=device.lower() + ) + except ValueError: + model = AutoModelForCausalLM.from_pretrained( + model_id, trust_remote_code=True, device_map=device.lower(), _attn_implementation="eager", use_flash_attention_2=False + ) + model.eval() + elif use_genai: + logger.info("Using OpenVINO GenAI API") + model = load_visual_text_genai_pipeline(model_id, device, ov_config) + else: + logger.info("Using Optimum API") + from optimum.intel.openvino import OVModelForVisualCausalLM + try: + model = OVModelForVisualCausalLM.from_pretrained( + model_id, trust_remote_code=True, device=device, ov_config=ov_config + ) + except ValueError: + config = AutoConfig.from_pretrained(model_id, trust_remote_code=True) + model = OVModelForVisualCausalLM.from_pretrained( + model_id, + config=config, + trust_remote_code=True, + use_cache=True, + device=device, + ov_config=ov_config, + ) + return model + + +def load_image2image_genai_pipeline(model_dir, device="CPU", ov_config=None): + try: + import openvino_genai + except ImportError as e: + logger.error("Failed to import openvino_genai package. Please install it. Details:\n", e) + exit(-1) + + return GenAIModelWrapper( + openvino_genai.Image2ImagePipeline(model_dir, device, **ov_config), + model_dir, + "image-to-image" + ) + + +def load_imagetext2image_model( + model_id, device="CPU", ov_config=None, use_hf=False, use_genai=False +): + if use_hf: + logger.info("Using HF Transformers API") + model = AutoPipelineForImage2Image.from_pretrained( + model_id, trust_remote_code=True + ) + elif use_genai: + logger.info("Using OpenVINO GenAI API") + model = load_image2image_genai_pipeline(model_id, device, ov_config) + else: + logger.info("Using Optimum API") + from optimum.intel.openvino import OVPipelineForImage2Image + try: + model = OVPipelineForImage2Image.from_pretrained( + model_id, trust_remote_code=True, device=device, ov_config=ov_config + ) + except ValueError: + config = AutoConfig.from_pretrained(model_id, trust_remote_code=True) + model = OVPipelineForImage2Image.from_pretrained( + model_id, + config=config, + trust_remote_code=True, + use_cache=True, + device=device, + ov_config=ov_config, + ) + return model + + +def load_model( + model_type, model_id, device="CPU", ov_config=None, use_hf=False, use_genai=False +): + if model_id is None: + return None + + if ov_config: + with open(ov_config) as f: + ov_options = json.load(f) + else: + ov_options = {} + + if model_type == "text": + return load_text_model(model_id, device, ov_options, use_hf, use_genai) + elif model_type == "text-to-image": + return load_text2image_model( + model_id, device, ov_options, use_hf, use_genai + ) + elif model_type == "visual-text": + return load_visual_text_model(model_id, device, ov_options, use_hf, use_genai) + elif model_type == "image-to-image": + return load_imagetext2image_model(model_id, device, ov_options, use_hf, use_genai) + else: + raise ValueError(f"Unsupported model type: {model_type}") diff --git a/tools/who_what_benchmark/whowhatbench/text2image_evaluator.py b/tools/who_what_benchmark/whowhatbench/text2image_evaluator.py index 0cced117e4..e930c48b0a 100644 --- a/tools/who_what_benchmark/whowhatbench/text2image_evaluator.py +++ b/tools/who_what_benchmark/whowhatbench/text2image_evaluator.py @@ -116,14 +116,15 @@ def worst_examples(self, top_k: int = 5, metric="similarity"): def _generate_data(self, model, gen_image_fn=None, image_dir="reference"): def default_gen_image_fn(model, prompt, num_inference_steps, generator=None): - output = model( - prompt, - num_inference_steps=num_inference_steps, - output_type="pil", - width=self.resolution[0], - height=self.resolution[0], - generator=generator, - ) + with torch.no_grad(): + output = model( + prompt, + num_inference_steps=num_inference_steps, + output_type="pil", + width=self.resolution[0], + height=self.resolution[0], + generator=generator, + ) return output.images[0] generation_fn = gen_image_fn or default_gen_image_fn diff --git a/tools/who_what_benchmark/whowhatbench/wwb.py b/tools/who_what_benchmark/whowhatbench/wwb.py index 04813f5fd8..2ff8c45975 100644 --- a/tools/who_what_benchmark/whowhatbench/wwb.py +++ b/tools/who_what_benchmark/whowhatbench/wwb.py @@ -1,18 +1,17 @@ import argparse import difflib import numpy as np -import json import logging import os -from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer, AutoProcessor, AutoModel, AutoModelForVision2Seq +from transformers import AutoTokenizer, AutoProcessor import openvino as ov import pandas as pd from datasets import load_dataset -from diffusers import DiffusionPipeline from PIL import Image +from whowhatbench.model_loaders import load_model from whowhatbench import EVALUATOR_REGISTRY # Configure logging @@ -20,224 +19,6 @@ logger = logging.getLogger(__name__) -class GenAIModelWrapper: - """ - A helper class to store additional attributes for GenAI models - """ - - def __init__(self, model, model_dir, model_type): - self.model = model - self.model_type = model_type - - if model_type == "text" or model_type == "visual-text": - self.config = AutoConfig.from_pretrained(model_dir, trust_remote_code=True) - elif model_type == "text-to-image": - self.config = DiffusionPipeline.load_config( - model_dir, trust_remote_code=True) - - def __getattr__(self, attr): - if attr in self.__dict__: - return getattr(self, attr) - else: - return getattr(self.model, attr) - - -def load_text_genai_pipeline(model_dir, device="CPU", ov_config=None): - try: - import openvino_genai - except ImportError: - logger.error( - "Failed to import openvino_genai package. Please install it.") - exit(-1) - return GenAIModelWrapper(openvino_genai.LLMPipeline(model_dir, device=device, **ov_config), model_dir, "text") - - -def load_text_model( - model_id, device="CPU", ov_config=None, use_hf=False, use_genai=False -): - if use_hf: - logger.info("Using HF Transformers API") - model = AutoModelForCausalLM.from_pretrained( - model_id, trust_remote_code=True, device_map=device.lower() - ) - model.eval() - elif use_genai: - logger.info("Using OpenVINO GenAI API") - model = load_text_genai_pipeline(model_id, device, ov_config) - else: - logger.info("Using Optimum API") - from optimum.intel.openvino import OVModelForCausalLM - try: - model = OVModelForCausalLM.from_pretrained( - model_id, trust_remote_code=True, device=device, ov_config=ov_config - ) - except ValueError: - config = AutoConfig.from_pretrained( - model_id, trust_remote_code=True) - model = OVModelForCausalLM.from_pretrained( - model_id, - config=config, - trust_remote_code=True, - use_cache=True, - device=device, - ov_config=ov_config, - ) - - return model - - -def load_text2image_genai_pipeline(model_dir, device="CPU", ov_config=None): - try: - import openvino_genai - except ImportError: - logger.error( - "Failed to import openvino_genai package. Please install it.") - exit(-1) - - return GenAIModelWrapper( - openvino_genai.Text2ImagePipeline(model_dir, device=device, **ov_config), - model_dir, - "text-to-image" - ) - - -def load_text2image_model( - model_type, model_id, device="CPU", ov_config=None, use_hf=False, use_genai=False -): - if use_genai: - logger.info("Using OpenvINO GenAI API") - model = load_text2image_genai_pipeline(model_id, device, ov_config) - elif use_hf: - logger.info("Using HF Transformers API") - model = DiffusionPipeline.from_pretrained( - model_id, trust_remote_code=True) - else: - logger.info("Using Optimum API") - from optimum.intel import OVPipelineForText2Image - TEXT2IMAGEPipeline = OVPipelineForText2Image - - try: - model = TEXT2IMAGEPipeline.from_pretrained( - model_id, trust_remote_code=True, device=device, ov_config=ov_config - ) - except ValueError: - config = AutoConfig.from_pretrained( - model_id, trust_remote_code=True) - model = TEXT2IMAGEPipeline.from_pretrained( - model_id, - config=config, - trust_remote_code=True, - use_cache=True, - device=device, - ov_config=ov_config, - ) - - return model - - -def load_visual_text_genai_pipeline(model_dir, device="CPU", ov_config=None): - try: - import openvino_genai - except ImportError as e: - logger.error("Failed to import openvino_genai package. Please install it. Details:\n", e) - exit(-1) - - return GenAIModelWrapper( - openvino_genai.VLMPipeline(model_dir, device, **ov_config), - model_dir, - "visual-text" - ) - - -def load_visual_text_model( - model_id, device="CPU", ov_config=None, use_hf=False, use_genai=False -): - if use_hf: - logger.info("Using HF Transformers API") - config = AutoConfig.from_pretrained(model_id, trust_remote_code=True) - try: - model = AutoModelForVision2Seq.from_pretrained( - model_id, trust_remote_code=True, device_map=device.lower() - ) - except ValueError: - try: - model = AutoModel.from_pretrained( - model_id, trust_remote_code=True, device_map=device.lower() - ) - except ValueError: - model = AutoModelForCausalLM.from_pretrained( - model_id, trust_remote_code=True, device_map=device.lower(), _attn_implementation="eager", use_flash_attention_2=False - ) - model.eval() - elif use_genai: - logger.info("Using OpenVINO GenAI API") - model = load_visual_text_genai_pipeline(model_id, device, ov_config) - else: - logger.info("Using Optimum API") - from optimum.intel.openvino import OVModelForVisualCausalLM - try: - model = OVModelForVisualCausalLM.from_pretrained( - model_id, trust_remote_code=True, device=device, ov_config=ov_config - ) - except ValueError: - config = AutoConfig.from_pretrained(model_id, trust_remote_code=True) - model = OVModelForVisualCausalLM.from_pretrained( - model_id, - config=config, - trust_remote_code=True, - use_cache=True, - device=device, - ov_config=ov_config, - ) - return model - - -def load_model( - model_type, model_id, device="CPU", ov_config=None, use_hf=False, use_genai=False -): - if model_id is None: - return None - - if ov_config: - with open(ov_config) as f: - ov_options = json.load(f) - else: - ov_options = {} - - if model_type == "text": - return load_text_model(model_id, device, ov_options, use_hf, use_genai) - elif model_type == "text-to-image": - return load_text2image_model( - model_type, model_id, device, ov_options, use_hf, use_genai - ) - elif model_type == "visual-text": - return load_visual_text_model(model_id, device, ov_options, use_hf, use_genai) - else: - raise ValueError(f"Unsupported model type: {model_type}") - - -def load_prompts(args): - if args.dataset is None: - return None - split = "validation" - if args.split is not None: - split = args.split - if "," in args.dataset: - path_name = args.dataset.split(",") - path = path_name[0] - name = path_name[1] - else: - path = args.dataset - name = None - data = load_dataset(path=path, name=name, split=split) - - res = data[args.dataset_field] - - res = {"prompts": list(res)} - - return res - - def parse_args(): parser = argparse.ArgumentParser( prog="WWB CLI", @@ -274,9 +55,10 @@ def parse_args(): parser.add_argument( "--model-type", type=str, - choices=["text", "text-to-image", "visual-text"], + choices=["text", "text-to-image", "visual-text", "image-to-image"], default="text", - help="Indicated the model type: 'text' - for causal text generation, 'text-to-image' - for image generation.", + help="Indicated the model type: 'text' - for causal text generation, 'text-to-image' - for image generation, " + "visual-text - for Visual Language Models, image-to-image - for image generation based on image and prompt", ) parser.add_argument( "--data-encoder", @@ -385,6 +167,26 @@ def check_args(args): "Wether --target-model, --target-data or --gt-data should be provided") +def load_prompts(args): + if args.dataset is None: + return None + split = "validation" + if args.split is not None: + split = args.split + if "," in args.dataset: + path_name = args.dataset.split(",") + path = path_name[0] + name = path_name[1] + else: + path = args.dataset + name = None + data = load_dataset(path=path, name=name, split=split) + + res = data[args.dataset_field] + res = {"prompts": list(res)} + return res + + def load_tokenizer(args): tokenizer = None if args.tokenizer is not None: @@ -449,7 +251,7 @@ def genai_gen_text(model, tokenizer, question, max_new_tokens, skip_question): def genai_gen_image(model, prompt, num_inference_steps, generator=None): - if model.resolution[0] is not None: + if model.resolution is not None and model.resolution[0] is not None: image_tensor = model.generate( prompt, width=model.resolution[0], @@ -467,8 +269,21 @@ def genai_gen_image(model, prompt, num_inference_steps, generator=None): return image +def genai_gen_image2image(model, prompt, image, num_inference_steps, generator=None): + image_data = ov.Tensor(np.array(image.getdata()).reshape(1, image.size[1], image.size[0], 3).astype(np.uint8)) + image_tensor = model.generate( + prompt, + image=image_data, + num_inference_steps=num_inference_steps, + strength=0.8, + generator=generator, + ) + image = Image.fromarray(image_tensor.data[0]) + return image + + def genai_gen_visual_text(model, prompt, image, processor, tokenizer, max_new_tokens, crop_question): - image_data = ov.Tensor(np.array(image.getdata()).reshape(1, image.size[1], image.size[0], 3).astype(np.byte)) + image_data = ov.Tensor(np.array(image.getdata()).reshape(1, image.size[1], image.size[0], 3).astype(np.uint8)) config = model.get_generation_config() config.max_new_tokens = max_new_tokens config.do_sample = False @@ -529,6 +344,17 @@ def create_evaluator(base_model, args): gen_answer_fn=genai_gen_visual_text if args.genai else None, processor=processor, ) + elif task == "image-to-image": + return EvaluatorCLS( + base_model=base_model, + gt_data=args.gt_data, + test_data=prompts, + num_samples=args.num_samples, + num_inference_steps=args.num_inference_steps, + gen_image_fn=genai_gen_image2image if args.genai else None, + is_genai=args.genai, + seed=args.seed, + ) else: raise ValueError(f"Unsupported task: {task}") @@ -637,7 +463,7 @@ def main(): if args.verbose and (args.target_model or args.target_data): if args.model_type == "text" or args.model_type == "visual-text": print_text_results(evaluator) - elif "text-to-image" in args.model_type: + elif "text-to-image" in args.model_type or "image-to-image" in args.model_type: print_image_results(evaluator)