diff --git a/invokeai/app/invocations/image.py b/invokeai/app/invocations/image.py index a551f8df8a4..995179ce1ef 100644 --- a/invokeai/app/invocations/image.py +++ b/invokeai/app/invocations/image.py @@ -16,7 +16,7 @@ WithBoard, WithMetadata, ) -from invokeai.app.invocations.primitives import ImageOutput +from invokeai.app.invocations.primitives import ImageOutput, StringOutput from invokeai.app.services.image_records.image_records_common import ImageCategory from invokeai.app.services.shared.invocation_context import InvocationContext from invokeai.backend.image_util.invisible_watermark import InvisibleWatermark @@ -542,6 +542,25 @@ def invoke(self, context: InvocationContext) -> ImageOutput: return ImageOutput.build(image_dto) +@invocation( + "retrieve_watermark", + title="Retrieve Invisible Watermark", + tags=["image", "watermark"], + category="image", + version="1.2.2", +) +class RetrieveWatermarkInvocation(BaseInvocation): + """Read an invisible watermark from an image""" + + image: ImageField = InputField(description="The image to read the watermark from") + watermark_len: int = InputField(default=8, description="length of watermark, in characters") + + def invoke(self, context: InvocationContext) -> StringOutput: + image = context.images.get_pil(self.image.image_name) + watermark_text = InvisibleWatermark.read_watermark(image, self.watermark_len * 8) + return StringOutput(value=watermark_text) + + @invocation( "mask_edge", title="Mask Edge", diff --git a/invokeai/backend/image_util/invisible_watermark.py b/invokeai/backend/image_util/invisible_watermark.py index 84342e442fc..bfc1dab9881 100644 --- a/invokeai/backend/image_util/invisible_watermark.py +++ b/invokeai/backend/image_util/invisible_watermark.py @@ -6,7 +6,7 @@ import cv2 import numpy as np -from imwatermark import WatermarkEncoder +from imwatermark import WatermarkDecoder, WatermarkEncoder from PIL import Image import invokeai.backend.util.logging as logger @@ -28,3 +28,10 @@ def add_watermark(cls, image: Image.Image, watermark_text: str) -> Image.Image: encoder.set_watermark("bytes", watermark_text.encode("utf-8")) bgr_encoded = encoder.encode(bgr, "dwtDct") return Image.fromarray(cv2.cvtColor(bgr_encoded, cv2.COLOR_BGR2RGB)).convert("RGBA") + + @classmethod + def read_watermark(cls, image: Image.Image, length: int = 64) -> str: + bgr = cv2.cvtColor(np.array(image.convert("RGB")), cv2.COLOR_RGB2BGR) + decoder = WatermarkDecoder("bytes", length) + watermark = decoder.decode(bgr, "dwtDct") + return watermark.decode("utf-8")