Adding naive implemenation for OV using OVStableDiffusionPipeline

This commit is contained in:
unknown 2024-12-02 16:58:29 -08:00
parent 4c82741b54
commit 9f9fe77a38
3 changed files with 308 additions and 1 deletions

View File

@ -39,6 +39,7 @@ class IsChangedCache:
node = self.dynprompt.get_node(node_id)
class_type = node["class_type"]
class_def = nodes.NODE_CLASS_MAPPINGS[class_type]
print("Class def: ", class_def)
if not hasattr(class_def, "IS_CHANGED"):
self.is_changed[node_id] = False
return self.is_changed[node_id]

View File

@ -7,7 +7,7 @@ import logging
from typing import Set, List, Dict, Tuple, Literal
from collections.abc import Collection
supported_pt_extensions: set[str] = {'.ckpt', '.pt', '.bin', '.pth', '.safetensors', '.pkl', '.sft'}
supported_pt_extensions: set[str] = {'.ckpt', '.pt', '.bin', '.pth', '.safetensors', '.pkl', '.sft', '.xml'}
folder_names_and_paths: dict[str, tuple[list[str], set[str]]] = {}
@ -39,6 +39,8 @@ folder_names_and_paths["photomaker"] = ([os.path.join(models_dir, "photomaker")]
folder_names_and_paths["classifiers"] = ([os.path.join(models_dir, "classifiers")], {""})
folder_names_and_paths["openvino"] = ([os.path.join(models_dir, "openvino")], supported_pt_extensions)
output_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)), "output")
temp_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)), "temp")
input_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)), "input")

304
nodes.py
View File

@ -36,6 +36,11 @@ import folder_paths
import latent_preview
import node_helpers
# Test OpenVINO Execution
from openvino.runtime import Core
import numpy as np
from custom_nodes.openvino_unet import OpenVINOUNetInference
def before_node_execution():
comfy.model_management.throw_exception_if_processing_interrupted()
@ -44,6 +49,46 @@ def interrupt_processing(value=True):
MAX_RESOLUTION=16384
"""
class OpenVINOUNetInference:
@classmethod
def INPUT_TYPES(cls):
return{
"required": {
"model_path": ("STRING", ), "input_data": ("DATA", )
}
}
RETURN_TYPES = ("OUTPUT")
FUNCTION = "infer"
CATEGORY = "custom/openvino"
DESCRIPTION = "Perform inference on a latent image using an OpenVINO-optimized UNet model."
def __init__(self):
self.core = Core()
self.compiled_model = None
self.input_name = None
self.output_name = None
print("OpenVINOUNetInference Node Initialized")
def load_model(self, model_path):
# Loads and compiles OpenVINO model
model = self.core.read_model(model=model_path)
self.compiled_model = self.core.compile_model(model, "CPU")
self.input_name = self.compiled_model.input(0).get_any_name()
self.output_name = self.compiled+model.output(0).get_any_name()
def infer(self, latent_image, model_path):
# Run inference on the latent image
if self.compiled_model is None or self.input_name is None or self.output_name is None:
self.load_model(model_path)
latent_array = latent_image["samples"].detach().cpu().numpy()
output = self.compiled_model.infer({self.input_name: latent_array})
result_tensor = torch.from_numpy(output[self.output_name]).to(latent_image["samples"].device)
latent_image["samples"] = result_tensor
return (latent_image,)
"""
class CLIPTextEncode:
@classmethod
def INPUT_TYPES(s):
@ -66,6 +111,85 @@ class CLIPTextEncode:
cond = output.pop("cond")
return ([[cond, output]], )
class CLIPTextEncodeOpenVINO:
def __init__(self):
self.core = Core()
self.clip_model = None
self.compiled_model = None
self.current_model_path = None
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"text": ("STRING", {"multiline": True, "tooltip": "The text to be encoded."}),
"clip": ("CLIP", {"tooltip": "The OpenVINO CLIP model for text encoding."}),
}
}
RETURN_TYPES = ("CONDITIONING",)
OUTPUT_TOOLTIPS = ("The text encoding for conditioning.",)
FUNCTION = "encode"
CATEGORY = "conditioning"
def load_model(self, clip_path):
"""
Load the already compiled CLIP model from the provided path if it hasn't been loaded yet.
"""
if self.clip_model is None or clip_path != self.current_model_path:
# Load the pre-compiled model
print(f"Loading pre-compiled model from {clip_path}...")
self.clip_model = clip_path.get_runtime_model()
if self.clip_model is None:
raise ValueError(f"Failed to load model from {clip_path}")
# Store the current model path
self.current_model_path = clip_path
print("Model loaded successfully.")
# Assuming the model is already compiled, we directly assign it to compiled_model
self.compiled_model = self.clip_model
def encode(self, clip, text):
"""
Encodes the input text using the provided CLIP model.
"""
# Load the model if it's not already loaded
self.load_model(clip)
# Tokenize the text (replace this with your tokenization logic specific to the CLIP model)
tokens = tokenize_text(text)
print("Tokenization complete.")
# Check if the model is loaded and compiled
if self.compiled_model is None:
raise ValueError("Model has not been loaded or compiled successfully.")
# Prepare the input dictionary for inference (match input name expected by the model)
input_name = self.compiled_model.input() # Assuming single input
print(f"Using input name: {input_name}")
input_data = {input_name: tokens}
# Run inference
#result = self.compiled_model.infer(input_data)
#result = Core.InferRequest.infer(in)
# Extract the output (adjust if there are multiple outputs)
output_name = self.compiled_model.output(0).get_any_name() # Assuming single output
output_data = result[output_name]
return (output_data,) # Return the encoded conditioning as a tuple
def tokenize_text(text):
"""
Tokenize the input text in a way that is compatible with the CLIP model.
This is a placeholder, and you need to implement tokenization according to the CLIP model's requirements.
"""
# Example: Tokenize by converting text into a sequence of word indices or embeddings.
# This should match the tokenization expected by your OpenVINO CLIP model.
return np.array([ord(c) for c in text]) # This is just a placeholder implementation.
class ConditioningCombine:
@classmethod
def INPUT_TYPES(s):
@ -286,6 +410,37 @@ class VAEDecode:
images = images.reshape(-1, images.shape[-3], images.shape[-2], images.shape[-1])
return (images, )
class VAEDecodeOpenVINO:
def __init__(self):
self.core = Core()
self.vae_model = None
self.current_model_path = None
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"samples": ("LATENT", {"tooltip": "Latent to decode."}),
"vae": ("VAE", {"tooltip": "OpenVINO VAE model."}),
}
}
RETURN_TYPES = ("IMAGE",)
OUTPUT_TOOLTIPS = ("Decoded image.",)
FUNCTION = "decode"
CATEGORY = "latent"
def load_model(self, vae_path):
if self.vae_model is None or vae_path != self.current_model_path:
self.vae_model = self.core.read_model(vae_path)
self.compiled_model = self.core.compile_model(self.vae_model, "CPU")
self.current_model_path = vae_path
def decode(self, vae, samples):
self.load_model(vae)
results = self.compiled_model([samples])
return (results["decoded_image"], )
class VAEDecodeTiled:
@classmethod
def INPUT_TYPES(s):
@ -320,6 +475,30 @@ class VAEEncode:
t = vae.encode(pixels[:,:,:,:3])
return ({"samples":t}, )
class VAEEncodeOpenVINO:
def __init__(self):
self.core = Core()
self.vae_model = None
self.current_model_path = None
@classmethod
def INPUT_TYPES(s):
return {"required": {"pixels": ("IMAGE", ), "vae": ("VAE", )}}
RETURN_TYPES = ("LATENT",)
FUNCTION = "encode"
CATEGORY = "latent"
def load_model(self, vae_path):
if self.vae_model is None or vae_path != self.current_model_path:
self.vae_model = self.core.read_model(vae_path)
self.compiled_model = self.core.compile_model(self.vae_model, "CPU")
self.current_model_path = vae_path
def encode(self, vae, pixels):
self.load_model(vae)
results = self.compiled_model([pixels])
return ({"samples": results["encoded_latent"]}, )
class VAEEncodeTiled:
@classmethod
def INPUT_TYPES(s):
@ -1456,6 +1635,46 @@ class KSampler:
def sample(self, model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=1.0):
return common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise)
class KSamplerOpenVINO:
def __init__(self):
self.core = Core()
self.model = None
self.current_model_path = None
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"model": ("MODEL", {"tooltip": "The OpenVINO model for denoising."}),
"seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff, "tooltip": "Random seed for noise generation."}),
"steps": ("INT", {"default": 20, "min": 1, "max": 10000, "tooltip": "Number of denoising steps."}),
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step": 0.1, "round": 0.01, "tooltip": "Guidance scale."}),
"positive": ("CONDITIONING", {"tooltip": "Positive conditioning input."}),
"negative": ("CONDITIONING", {"tooltip": "Negative conditioning input."}),
"latent_image": ("LATENT", {"tooltip": "Latent image to denoise."}),
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "tooltip": "Amount of denoising applied."}),
}
}
RETURN_TYPES = ("LATENT",)
OUTPUT_TOOLTIPS = ("The denoised latent.",)
FUNCTION = "sample"
CATEGORY = "sampling"
DESCRIPTION = "Denoises a latent image using OpenVINO."
def load_model(self, model_path):
if self.model is None or model_path != self.current_model_path:
self.model = self.core.read_model(model_path)
self.compiled_model = self.core.compile_model(self.model, "CPU")
self.current_model_path = model_path
def sample(self, model, seed, steps, cfg, positive, negative, latent_image, denoise=1.0):
self.load_model(model)
# Prepare input data (e.g., latent_image, positive/negative embeddings)
input_data = {"latent_image": latent_image, "positive": positive, "negative": negative}
results = self.compiled_model(input_data)
return results["denoised_latent"]
class KSamplerAdvanced:
@classmethod
def INPUT_TYPES(s):
@ -1545,6 +1764,83 @@ class SaveImage:
return { "ui": { "images": results } }
class SaveImageOpenVINO:
def __init__(self):
self.output_dir = folder_paths.get_output_directory()
self.type = "output"
self.prefix_append = ""
self.compress_level = 4
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"images": ("IMAGE", {"tooltip": "The generated image to save."}),
"filename_prefix": ("STRING", {"default": "ComfyUI", "tooltip": "The prefix for the file to save. This may include formatting information such as %date:yyyy-MM-dd% or %Empty Latent Image.width% to include values from nodes."}),
},
"hidden": {
"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"
},
}
RETURN_TYPES = ()
FUNCTION = "save_images"
OUTPUT_NODE = True
CATEGORY = "image"
DESCRIPTION = "Saves the generated image from OpenVINO pipeline to your output directory."
def save_images(self, images, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None):
"""
Save the generated OpenVINO image(s) to disk.
"""
filename_prefix += self.prefix_append
# Get save path details using current model directory or a default directory
full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(
filename_prefix, self.output_dir, images.size[0], images.size[1]
)
results = list()
# If there are multiple images, we can loop through, otherwise just handle the single image
for batch_number, image in enumerate([images]): # Wrap the image in a list to handle loop
# Normalize the image data to the range [0, 255]
image_np = np.array(image) # Convert the image to a numpy array
#if image_np.max() > 1.0: # If the image has values greater than 1.0 (already normalized), scale it to [0, 255]
# image_np = np.clip(image_np, 0, 255)
#else:
# image_np = np.clip(image_np * 255, 0, 255) # Normalize to [0, 255]
# Convert the numpy array back to an image
img = Image.fromarray(image_np.astype(np.uint8))
# Add metadata if available
metadata = None
if prompt or extra_pnginfo:
metadata = PngInfo()
if prompt is not None:
metadata.add_text("prompt", json.dumps(prompt))
if extra_pnginfo is not None:
for key, value in extra_pnginfo.items():
metadata.add_text(key, json.dumps(value))
# Save the image with metadata
filename_with_batch_num = filename.replace("%batch_num%", str(batch_number))
file = f"{filename_with_batch_num}_{counter:05}.png"
images.save(os.path.join(full_output_folder, file), pnginfo=metadata, compress_level=self.compress_level)
# Add result entry
results.append({
"filename": file,
"subfolder": subfolder,
"type": self.type
})
counter += 1
return {"ui": {"images": results}}
class PreviewImage(SaveImage):
def __init__(self):
self.output_dir = folder_paths.get_temp_directory()
@ -1849,12 +2145,17 @@ class ImagePadForOutpaint:
NODE_CLASS_MAPPINGS = {
"OpenVINOUNetInference": OpenVINOUNetInference,
"KSampler": KSampler,
"KSamplerOpenVINO": KSamplerOpenVINO,
"CheckpointLoaderSimple": CheckpointLoaderSimple,
"CLIPTextEncode": CLIPTextEncode,
"CLIPTextEncodeOpenVINO": CLIPTextEncodeOpenVINO,
"CLIPSetLastLayer": CLIPSetLastLayer,
"VAEDecode": VAEDecode,
"VAEDecodeOpenVINO": VAEDecodeOpenVINO,
"VAEEncode": VAEEncode,
"VAEEncodeOpenVINO": VAEEncodeOpenVINO,
"VAEEncodeForInpaint": VAEEncodeForInpaint,
"VAELoader": VAELoader,
"EmptyLatentImage": EmptyLatentImage,
@ -1863,6 +2164,7 @@ NODE_CLASS_MAPPINGS = {
"LatentFromBatch": LatentFromBatch,
"RepeatLatentBatch": RepeatLatentBatch,
"SaveImage": SaveImage,
"SaveImageOpenVINO": SaveImageOpenVINO,
"PreviewImage": PreviewImage,
"LoadImage": LoadImage,
"LoadImageMask": LoadImageMask,
@ -1918,6 +2220,7 @@ NODE_CLASS_MAPPINGS = {
}
NODE_DISPLAY_NAME_MAPPINGS = {
"OpenVINOUNetInference": "OpenVINO Inference",
# Sampling
"KSampler": "KSampler",
"KSamplerAdvanced": "KSampler (Advanced)",
@ -1963,6 +2266,7 @@ NODE_DISPLAY_NAME_MAPPINGS = {
"RepeatLatentBatch": "Repeat Latent Batch",
# Image
"SaveImage": "Save Image",
"SaveImageOpenVINO": "Save Image OpenVINO",
"PreviewImage": "Preview Image",
"LoadImage": "Load Image",
"LoadImageMask": "Load Image (as Mask)",