mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2025-01-25 15:55:18 +00:00
0d45efb7d6
* Fix issue with how PIL loads small PNG files nodes.py Added flag to prevent ValueError: Decompressed Data Too Large when loading PNG images with large meta data such as large embedded color profiles * Update LoadImage node to fix error when loading PNG's in nodes.py Fixed Value Error: Decompressed Data Too Large thrown by PIL when attempting to opening PNG files with large embedded ICC colorspaces by setting the follow flag to true when loading png images: ImageFile.LOAD_TRUNCATED_IMAGES = True * Update node_helpers.py to include open_image helper function open_image includes try except to catch Pillow Value Errors that occur when large ICC profiles are embedded in images. * Update LoadImage node to use open_image helper function inplace of Image.open open_image helper function in node_helpers.py fixes a Pillow error when attempting to open images with large embedded ICC profiles by adding an exception handler to load the image with truncated meta data if regular loading is not possible.
1970 lines
74 KiB
Python
1970 lines
74 KiB
Python
import torch
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import hashlib
|
|
import traceback
|
|
import math
|
|
import time
|
|
import random
|
|
import logging
|
|
|
|
from PIL import Image, ImageOps, ImageSequence
|
|
from PIL.PngImagePlugin import PngInfo
|
|
|
|
import numpy as np
|
|
import safetensors.torch
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), "comfy"))
|
|
|
|
import comfy.diffusers_load
|
|
import comfy.samplers
|
|
import comfy.sample
|
|
import comfy.sd
|
|
import comfy.utils
|
|
import comfy.controlnet
|
|
|
|
import comfy.clip_vision
|
|
|
|
import comfy.model_management
|
|
from comfy.cli_args import args
|
|
|
|
import importlib
|
|
|
|
import folder_paths
|
|
import latent_preview
|
|
import node_helpers
|
|
|
|
def before_node_execution():
|
|
comfy.model_management.throw_exception_if_processing_interrupted()
|
|
|
|
def interrupt_processing(value=True):
|
|
comfy.model_management.interrupt_current_processing(value)
|
|
|
|
MAX_RESOLUTION=16384
|
|
|
|
class CLIPTextEncode:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"text": ("STRING", {"multiline": True, "dynamicPrompts": True}), "clip": ("CLIP", )}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "encode"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def encode(self, clip, text):
|
|
tokens = clip.tokenize(text)
|
|
cond, pooled = clip.encode_from_tokens(tokens, return_pooled=True)
|
|
return ([[cond, {"pooled_output": pooled}]], )
|
|
|
|
class ConditioningCombine:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning_1": ("CONDITIONING", ), "conditioning_2": ("CONDITIONING", )}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "combine"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def combine(self, conditioning_1, conditioning_2):
|
|
return (conditioning_1 + conditioning_2, )
|
|
|
|
class ConditioningAverage :
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning_to": ("CONDITIONING", ), "conditioning_from": ("CONDITIONING", ),
|
|
"conditioning_to_strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01})
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "addWeighted"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def addWeighted(self, conditioning_to, conditioning_from, conditioning_to_strength):
|
|
out = []
|
|
|
|
if len(conditioning_from) > 1:
|
|
logging.warning("Warning: ConditioningAverage conditioning_from contains more than 1 cond, only the first one will actually be applied to conditioning_to.")
|
|
|
|
cond_from = conditioning_from[0][0]
|
|
pooled_output_from = conditioning_from[0][1].get("pooled_output", None)
|
|
|
|
for i in range(len(conditioning_to)):
|
|
t1 = conditioning_to[i][0]
|
|
pooled_output_to = conditioning_to[i][1].get("pooled_output", pooled_output_from)
|
|
t0 = cond_from[:,:t1.shape[1]]
|
|
if t0.shape[1] < t1.shape[1]:
|
|
t0 = torch.cat([t0] + [torch.zeros((1, (t1.shape[1] - t0.shape[1]), t1.shape[2]))], dim=1)
|
|
|
|
tw = torch.mul(t1, conditioning_to_strength) + torch.mul(t0, (1.0 - conditioning_to_strength))
|
|
t_to = conditioning_to[i][1].copy()
|
|
if pooled_output_from is not None and pooled_output_to is not None:
|
|
t_to["pooled_output"] = torch.mul(pooled_output_to, conditioning_to_strength) + torch.mul(pooled_output_from, (1.0 - conditioning_to_strength))
|
|
elif pooled_output_from is not None:
|
|
t_to["pooled_output"] = pooled_output_from
|
|
|
|
n = [tw, t_to]
|
|
out.append(n)
|
|
return (out, )
|
|
|
|
class ConditioningConcat:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {
|
|
"conditioning_to": ("CONDITIONING",),
|
|
"conditioning_from": ("CONDITIONING",),
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "concat"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def concat(self, conditioning_to, conditioning_from):
|
|
out = []
|
|
|
|
if len(conditioning_from) > 1:
|
|
logging.warning("Warning: ConditioningConcat conditioning_from contains more than 1 cond, only the first one will actually be applied to conditioning_to.")
|
|
|
|
cond_from = conditioning_from[0][0]
|
|
|
|
for i in range(len(conditioning_to)):
|
|
t1 = conditioning_to[i][0]
|
|
tw = torch.cat((t1, cond_from),1)
|
|
n = [tw, conditioning_to[i][1].copy()]
|
|
out.append(n)
|
|
|
|
return (out, )
|
|
|
|
class ConditioningSetArea:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", ),
|
|
"width": ("INT", {"default": 64, "min": 64, "max": MAX_RESOLUTION, "step": 8}),
|
|
"height": ("INT", {"default": 64, "min": 64, "max": MAX_RESOLUTION, "step": 8}),
|
|
"x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "append"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def append(self, conditioning, width, height, x, y, strength):
|
|
c = node_helpers.conditioning_set_values(conditioning, {"area": (height // 8, width // 8, y // 8, x // 8),
|
|
"strength": strength,
|
|
"set_area_to_bounds": False})
|
|
return (c, )
|
|
|
|
class ConditioningSetAreaPercentage:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", ),
|
|
"width": ("FLOAT", {"default": 1.0, "min": 0, "max": 1.0, "step": 0.01}),
|
|
"height": ("FLOAT", {"default": 1.0, "min": 0, "max": 1.0, "step": 0.01}),
|
|
"x": ("FLOAT", {"default": 0, "min": 0, "max": 1.0, "step": 0.01}),
|
|
"y": ("FLOAT", {"default": 0, "min": 0, "max": 1.0, "step": 0.01}),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "append"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def append(self, conditioning, width, height, x, y, strength):
|
|
c = node_helpers.conditioning_set_values(conditioning, {"area": ("percentage", height, width, y, x),
|
|
"strength": strength,
|
|
"set_area_to_bounds": False})
|
|
return (c, )
|
|
|
|
class ConditioningSetAreaStrength:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", ),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "append"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def append(self, conditioning, strength):
|
|
c = node_helpers.conditioning_set_values(conditioning, {"strength": strength})
|
|
return (c, )
|
|
|
|
|
|
class ConditioningSetMask:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", ),
|
|
"mask": ("MASK", ),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
|
|
"set_cond_area": (["default", "mask bounds"],),
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "append"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def append(self, conditioning, mask, set_cond_area, strength):
|
|
set_area_to_bounds = False
|
|
if set_cond_area != "default":
|
|
set_area_to_bounds = True
|
|
if len(mask.shape) < 3:
|
|
mask = mask.unsqueeze(0)
|
|
|
|
c = node_helpers.conditioning_set_values(conditioning, {"mask": mask,
|
|
"set_area_to_bounds": set_area_to_bounds,
|
|
"mask_strength": strength})
|
|
return (c, )
|
|
|
|
class ConditioningZeroOut:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", )}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "zero_out"
|
|
|
|
CATEGORY = "advanced/conditioning"
|
|
|
|
def zero_out(self, conditioning):
|
|
c = []
|
|
for t in conditioning:
|
|
d = t[1].copy()
|
|
if "pooled_output" in d:
|
|
d["pooled_output"] = torch.zeros_like(d["pooled_output"])
|
|
n = [torch.zeros_like(t[0]), d]
|
|
c.append(n)
|
|
return (c, )
|
|
|
|
class ConditioningSetTimestepRange:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", ),
|
|
"start": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001}),
|
|
"end": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001})
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "set_range"
|
|
|
|
CATEGORY = "advanced/conditioning"
|
|
|
|
def set_range(self, conditioning, start, end):
|
|
c = node_helpers.conditioning_set_values(conditioning, {"start_percent": start,
|
|
"end_percent": end})
|
|
return (c, )
|
|
|
|
class VAEDecode:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT", ), "vae": ("VAE", )}}
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "decode"
|
|
|
|
CATEGORY = "latent"
|
|
|
|
def decode(self, vae, samples):
|
|
return (vae.decode(samples["samples"]), )
|
|
|
|
class VAEDecodeTiled:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"samples": ("LATENT", ), "vae": ("VAE", ),
|
|
"tile_size": ("INT", {"default": 512, "min": 320, "max": 4096, "step": 64})
|
|
}}
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "decode"
|
|
|
|
CATEGORY = "_for_testing"
|
|
|
|
def decode(self, vae, samples, tile_size):
|
|
return (vae.decode_tiled(samples["samples"], tile_x=tile_size // 8, tile_y=tile_size // 8, ), )
|
|
|
|
class VAEEncode:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "pixels": ("IMAGE", ), "vae": ("VAE", )}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "encode"
|
|
|
|
CATEGORY = "latent"
|
|
|
|
def encode(self, vae, pixels):
|
|
t = vae.encode(pixels[:,:,:,:3])
|
|
return ({"samples":t}, )
|
|
|
|
class VAEEncodeTiled:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"pixels": ("IMAGE", ), "vae": ("VAE", ),
|
|
"tile_size": ("INT", {"default": 512, "min": 320, "max": 4096, "step": 64})
|
|
}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "encode"
|
|
|
|
CATEGORY = "_for_testing"
|
|
|
|
def encode(self, vae, pixels, tile_size):
|
|
t = vae.encode_tiled(pixels[:,:,:,:3], tile_x=tile_size, tile_y=tile_size, )
|
|
return ({"samples":t}, )
|
|
|
|
class VAEEncodeForInpaint:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "pixels": ("IMAGE", ), "vae": ("VAE", ), "mask": ("MASK", ), "grow_mask_by": ("INT", {"default": 6, "min": 0, "max": 64, "step": 1}),}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "encode"
|
|
|
|
CATEGORY = "latent/inpaint"
|
|
|
|
def encode(self, vae, pixels, mask, grow_mask_by=6):
|
|
x = (pixels.shape[1] // vae.downscale_ratio) * vae.downscale_ratio
|
|
y = (pixels.shape[2] // vae.downscale_ratio) * vae.downscale_ratio
|
|
mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(pixels.shape[1], pixels.shape[2]), mode="bilinear")
|
|
|
|
pixels = pixels.clone()
|
|
if pixels.shape[1] != x or pixels.shape[2] != y:
|
|
x_offset = (pixels.shape[1] % vae.downscale_ratio) // 2
|
|
y_offset = (pixels.shape[2] % vae.downscale_ratio) // 2
|
|
pixels = pixels[:,x_offset:x + x_offset, y_offset:y + y_offset,:]
|
|
mask = mask[:,:,x_offset:x + x_offset, y_offset:y + y_offset]
|
|
|
|
#grow mask by a few pixels to keep things seamless in latent space
|
|
if grow_mask_by == 0:
|
|
mask_erosion = mask
|
|
else:
|
|
kernel_tensor = torch.ones((1, 1, grow_mask_by, grow_mask_by))
|
|
padding = math.ceil((grow_mask_by - 1) / 2)
|
|
|
|
mask_erosion = torch.clamp(torch.nn.functional.conv2d(mask.round(), kernel_tensor, padding=padding), 0, 1)
|
|
|
|
m = (1.0 - mask.round()).squeeze(1)
|
|
for i in range(3):
|
|
pixels[:,:,:,i] -= 0.5
|
|
pixels[:,:,:,i] *= m
|
|
pixels[:,:,:,i] += 0.5
|
|
t = vae.encode(pixels)
|
|
|
|
return ({"samples":t, "noise_mask": (mask_erosion[:,:,:x,:y].round())}, )
|
|
|
|
|
|
class InpaintModelConditioning:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"positive": ("CONDITIONING", ),
|
|
"negative": ("CONDITIONING", ),
|
|
"vae": ("VAE", ),
|
|
"pixels": ("IMAGE", ),
|
|
"mask": ("MASK", ),
|
|
}}
|
|
|
|
RETURN_TYPES = ("CONDITIONING","CONDITIONING","LATENT")
|
|
RETURN_NAMES = ("positive", "negative", "latent")
|
|
FUNCTION = "encode"
|
|
|
|
CATEGORY = "conditioning/inpaint"
|
|
|
|
def encode(self, positive, negative, pixels, vae, mask):
|
|
x = (pixels.shape[1] // 8) * 8
|
|
y = (pixels.shape[2] // 8) * 8
|
|
mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(pixels.shape[1], pixels.shape[2]), mode="bilinear")
|
|
|
|
orig_pixels = pixels
|
|
pixels = orig_pixels.clone()
|
|
if pixels.shape[1] != x or pixels.shape[2] != y:
|
|
x_offset = (pixels.shape[1] % 8) // 2
|
|
y_offset = (pixels.shape[2] % 8) // 2
|
|
pixels = pixels[:,x_offset:x + x_offset, y_offset:y + y_offset,:]
|
|
mask = mask[:,:,x_offset:x + x_offset, y_offset:y + y_offset]
|
|
|
|
m = (1.0 - mask.round()).squeeze(1)
|
|
for i in range(3):
|
|
pixels[:,:,:,i] -= 0.5
|
|
pixels[:,:,:,i] *= m
|
|
pixels[:,:,:,i] += 0.5
|
|
concat_latent = vae.encode(pixels)
|
|
orig_latent = vae.encode(orig_pixels)
|
|
|
|
out_latent = {}
|
|
|
|
out_latent["samples"] = orig_latent
|
|
out_latent["noise_mask"] = mask
|
|
|
|
out = []
|
|
for conditioning in [positive, negative]:
|
|
c = node_helpers.conditioning_set_values(conditioning, {"concat_latent_image": concat_latent,
|
|
"concat_mask": mask})
|
|
out.append(c)
|
|
return (out[0], out[1], out_latent)
|
|
|
|
|
|
class SaveLatent:
|
|
def __init__(self):
|
|
self.output_dir = folder_paths.get_output_directory()
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT", ),
|
|
"filename_prefix": ("STRING", {"default": "latents/ComfyUI"})},
|
|
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"},
|
|
}
|
|
RETURN_TYPES = ()
|
|
FUNCTION = "save"
|
|
|
|
OUTPUT_NODE = True
|
|
|
|
CATEGORY = "_for_testing"
|
|
|
|
def save(self, samples, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None):
|
|
full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir)
|
|
|
|
# support save metadata for latent sharing
|
|
prompt_info = ""
|
|
if prompt is not None:
|
|
prompt_info = json.dumps(prompt)
|
|
|
|
metadata = None
|
|
if not args.disable_metadata:
|
|
metadata = {"prompt": prompt_info}
|
|
if extra_pnginfo is not None:
|
|
for x in extra_pnginfo:
|
|
metadata[x] = json.dumps(extra_pnginfo[x])
|
|
|
|
file = f"{filename}_{counter:05}_.latent"
|
|
|
|
results = list()
|
|
results.append({
|
|
"filename": file,
|
|
"subfolder": subfolder,
|
|
"type": "output"
|
|
})
|
|
|
|
file = os.path.join(full_output_folder, file)
|
|
|
|
output = {}
|
|
output["latent_tensor"] = samples["samples"]
|
|
output["latent_format_version_0"] = torch.tensor([])
|
|
|
|
comfy.utils.save_torch_file(output, file, metadata=metadata)
|
|
return { "ui": { "latents": results } }
|
|
|
|
|
|
class LoadLatent:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
input_dir = folder_paths.get_input_directory()
|
|
files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f)) and f.endswith(".latent")]
|
|
return {"required": {"latent": [sorted(files), ]}, }
|
|
|
|
CATEGORY = "_for_testing"
|
|
|
|
RETURN_TYPES = ("LATENT", )
|
|
FUNCTION = "load"
|
|
|
|
def load(self, latent):
|
|
latent_path = folder_paths.get_annotated_filepath(latent)
|
|
latent = safetensors.torch.load_file(latent_path, device="cpu")
|
|
multiplier = 1.0
|
|
if "latent_format_version_0" not in latent:
|
|
multiplier = 1.0 / 0.18215
|
|
samples = {"samples": latent["latent_tensor"].float() * multiplier}
|
|
return (samples, )
|
|
|
|
@classmethod
|
|
def IS_CHANGED(s, latent):
|
|
image_path = folder_paths.get_annotated_filepath(latent)
|
|
m = hashlib.sha256()
|
|
with open(image_path, 'rb') as f:
|
|
m.update(f.read())
|
|
return m.digest().hex()
|
|
|
|
@classmethod
|
|
def VALIDATE_INPUTS(s, latent):
|
|
if not folder_paths.exists_annotated_filepath(latent):
|
|
return "Invalid latent file: {}".format(latent)
|
|
return True
|
|
|
|
|
|
class CheckpointLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "config_name": (folder_paths.get_filename_list("configs"), ),
|
|
"ckpt_name": (folder_paths.get_filename_list("checkpoints"), )}}
|
|
RETURN_TYPES = ("MODEL", "CLIP", "VAE")
|
|
FUNCTION = "load_checkpoint"
|
|
|
|
CATEGORY = "advanced/loaders"
|
|
|
|
def load_checkpoint(self, config_name, ckpt_name, output_vae=True, output_clip=True):
|
|
config_path = folder_paths.get_full_path("configs", config_name)
|
|
ckpt_path = folder_paths.get_full_path("checkpoints", ckpt_name)
|
|
return comfy.sd.load_checkpoint(config_path, ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings"))
|
|
|
|
class CheckpointLoaderSimple:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "ckpt_name": (folder_paths.get_filename_list("checkpoints"), ),
|
|
}}
|
|
RETURN_TYPES = ("MODEL", "CLIP", "VAE")
|
|
FUNCTION = "load_checkpoint"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_checkpoint(self, ckpt_name, output_vae=True, output_clip=True):
|
|
ckpt_path = folder_paths.get_full_path("checkpoints", ckpt_name)
|
|
out = comfy.sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings"))
|
|
return out[:3]
|
|
|
|
class DiffusersLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(cls):
|
|
paths = []
|
|
for search_path in folder_paths.get_folder_paths("diffusers"):
|
|
if os.path.exists(search_path):
|
|
for root, subdir, files in os.walk(search_path, followlinks=True):
|
|
if "model_index.json" in files:
|
|
paths.append(os.path.relpath(root, start=search_path))
|
|
|
|
return {"required": {"model_path": (paths,), }}
|
|
RETURN_TYPES = ("MODEL", "CLIP", "VAE")
|
|
FUNCTION = "load_checkpoint"
|
|
|
|
CATEGORY = "advanced/loaders/deprecated"
|
|
|
|
def load_checkpoint(self, model_path, output_vae=True, output_clip=True):
|
|
for search_path in folder_paths.get_folder_paths("diffusers"):
|
|
if os.path.exists(search_path):
|
|
path = os.path.join(search_path, model_path)
|
|
if os.path.exists(path):
|
|
model_path = path
|
|
break
|
|
|
|
return comfy.diffusers_load.load_diffusers(model_path, output_vae=output_vae, output_clip=output_clip, embedding_directory=folder_paths.get_folder_paths("embeddings"))
|
|
|
|
|
|
class unCLIPCheckpointLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "ckpt_name": (folder_paths.get_filename_list("checkpoints"), ),
|
|
}}
|
|
RETURN_TYPES = ("MODEL", "CLIP", "VAE", "CLIP_VISION")
|
|
FUNCTION = "load_checkpoint"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_checkpoint(self, ckpt_name, output_vae=True, output_clip=True):
|
|
ckpt_path = folder_paths.get_full_path("checkpoints", ckpt_name)
|
|
out = comfy.sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, output_clipvision=True, embedding_directory=folder_paths.get_folder_paths("embeddings"))
|
|
return out
|
|
|
|
class CLIPSetLastLayer:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "clip": ("CLIP", ),
|
|
"stop_at_clip_layer": ("INT", {"default": -1, "min": -24, "max": -1, "step": 1}),
|
|
}}
|
|
RETURN_TYPES = ("CLIP",)
|
|
FUNCTION = "set_last_layer"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def set_last_layer(self, clip, stop_at_clip_layer):
|
|
clip = clip.clone()
|
|
clip.clip_layer(stop_at_clip_layer)
|
|
return (clip,)
|
|
|
|
class LoraLoader:
|
|
def __init__(self):
|
|
self.loaded_lora = None
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "model": ("MODEL",),
|
|
"clip": ("CLIP", ),
|
|
"lora_name": (folder_paths.get_filename_list("loras"), ),
|
|
"strength_model": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01}),
|
|
"strength_clip": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01}),
|
|
}}
|
|
RETURN_TYPES = ("MODEL", "CLIP")
|
|
FUNCTION = "load_lora"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_lora(self, model, clip, lora_name, strength_model, strength_clip):
|
|
if strength_model == 0 and strength_clip == 0:
|
|
return (model, clip)
|
|
|
|
lora_path = folder_paths.get_full_path("loras", lora_name)
|
|
lora = None
|
|
if self.loaded_lora is not None:
|
|
if self.loaded_lora[0] == lora_path:
|
|
lora = self.loaded_lora[1]
|
|
else:
|
|
temp = self.loaded_lora
|
|
self.loaded_lora = None
|
|
del temp
|
|
|
|
if lora is None:
|
|
lora = comfy.utils.load_torch_file(lora_path, safe_load=True)
|
|
self.loaded_lora = (lora_path, lora)
|
|
|
|
model_lora, clip_lora = comfy.sd.load_lora_for_models(model, clip, lora, strength_model, strength_clip)
|
|
return (model_lora, clip_lora)
|
|
|
|
class LoraLoaderModelOnly(LoraLoader):
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "model": ("MODEL",),
|
|
"lora_name": (folder_paths.get_filename_list("loras"), ),
|
|
"strength_model": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01}),
|
|
}}
|
|
RETURN_TYPES = ("MODEL",)
|
|
FUNCTION = "load_lora_model_only"
|
|
|
|
def load_lora_model_only(self, model, lora_name, strength_model):
|
|
return (self.load_lora(model, None, lora_name, strength_model, 0)[0],)
|
|
|
|
class VAELoader:
|
|
@staticmethod
|
|
def vae_list():
|
|
vaes = folder_paths.get_filename_list("vae")
|
|
approx_vaes = folder_paths.get_filename_list("vae_approx")
|
|
sdxl_taesd_enc = False
|
|
sdxl_taesd_dec = False
|
|
sd1_taesd_enc = False
|
|
sd1_taesd_dec = False
|
|
|
|
for v in approx_vaes:
|
|
if v.startswith("taesd_decoder."):
|
|
sd1_taesd_dec = True
|
|
elif v.startswith("taesd_encoder."):
|
|
sd1_taesd_enc = True
|
|
elif v.startswith("taesdxl_decoder."):
|
|
sdxl_taesd_dec = True
|
|
elif v.startswith("taesdxl_encoder."):
|
|
sdxl_taesd_enc = True
|
|
if sd1_taesd_dec and sd1_taesd_enc:
|
|
vaes.append("taesd")
|
|
if sdxl_taesd_dec and sdxl_taesd_enc:
|
|
vaes.append("taesdxl")
|
|
return vaes
|
|
|
|
@staticmethod
|
|
def load_taesd(name):
|
|
sd = {}
|
|
approx_vaes = folder_paths.get_filename_list("vae_approx")
|
|
|
|
encoder = next(filter(lambda a: a.startswith("{}_encoder.".format(name)), approx_vaes))
|
|
decoder = next(filter(lambda a: a.startswith("{}_decoder.".format(name)), approx_vaes))
|
|
|
|
enc = comfy.utils.load_torch_file(folder_paths.get_full_path("vae_approx", encoder))
|
|
for k in enc:
|
|
sd["taesd_encoder.{}".format(k)] = enc[k]
|
|
|
|
dec = comfy.utils.load_torch_file(folder_paths.get_full_path("vae_approx", decoder))
|
|
for k in dec:
|
|
sd["taesd_decoder.{}".format(k)] = dec[k]
|
|
|
|
if name == "taesd":
|
|
sd["vae_scale"] = torch.tensor(0.18215)
|
|
elif name == "taesdxl":
|
|
sd["vae_scale"] = torch.tensor(0.13025)
|
|
return sd
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "vae_name": (s.vae_list(), )}}
|
|
RETURN_TYPES = ("VAE",)
|
|
FUNCTION = "load_vae"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
#TODO: scale factor?
|
|
def load_vae(self, vae_name):
|
|
if vae_name in ["taesd", "taesdxl"]:
|
|
sd = self.load_taesd(vae_name)
|
|
else:
|
|
vae_path = folder_paths.get_full_path("vae", vae_name)
|
|
sd = comfy.utils.load_torch_file(vae_path)
|
|
vae = comfy.sd.VAE(sd=sd)
|
|
return (vae,)
|
|
|
|
class ControlNetLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "control_net_name": (folder_paths.get_filename_list("controlnet"), )}}
|
|
|
|
RETURN_TYPES = ("CONTROL_NET",)
|
|
FUNCTION = "load_controlnet"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_controlnet(self, control_net_name):
|
|
controlnet_path = folder_paths.get_full_path("controlnet", control_net_name)
|
|
controlnet = comfy.controlnet.load_controlnet(controlnet_path)
|
|
return (controlnet,)
|
|
|
|
class DiffControlNetLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "model": ("MODEL",),
|
|
"control_net_name": (folder_paths.get_filename_list("controlnet"), )}}
|
|
|
|
RETURN_TYPES = ("CONTROL_NET",)
|
|
FUNCTION = "load_controlnet"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_controlnet(self, model, control_net_name):
|
|
controlnet_path = folder_paths.get_full_path("controlnet", control_net_name)
|
|
controlnet = comfy.controlnet.load_controlnet(controlnet_path, model)
|
|
return (controlnet,)
|
|
|
|
|
|
class ControlNetApply:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", ),
|
|
"control_net": ("CONTROL_NET", ),
|
|
"image": ("IMAGE", ),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01})
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "apply_controlnet"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def apply_controlnet(self, conditioning, control_net, image, strength):
|
|
if strength == 0:
|
|
return (conditioning, )
|
|
|
|
c = []
|
|
control_hint = image.movedim(-1,1)
|
|
for t in conditioning:
|
|
n = [t[0], t[1].copy()]
|
|
c_net = control_net.copy().set_cond_hint(control_hint, strength)
|
|
if 'control' in t[1]:
|
|
c_net.set_previous_controlnet(t[1]['control'])
|
|
n[1]['control'] = c_net
|
|
n[1]['control_apply_to_uncond'] = True
|
|
c.append(n)
|
|
return (c, )
|
|
|
|
|
|
class ControlNetApplyAdvanced:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"positive": ("CONDITIONING", ),
|
|
"negative": ("CONDITIONING", ),
|
|
"control_net": ("CONTROL_NET", ),
|
|
"image": ("IMAGE", ),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
|
|
"start_percent": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001}),
|
|
"end_percent": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001})
|
|
}}
|
|
|
|
RETURN_TYPES = ("CONDITIONING","CONDITIONING")
|
|
RETURN_NAMES = ("positive", "negative")
|
|
FUNCTION = "apply_controlnet"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def apply_controlnet(self, positive, negative, control_net, image, strength, start_percent, end_percent):
|
|
if strength == 0:
|
|
return (positive, negative)
|
|
|
|
control_hint = image.movedim(-1,1)
|
|
cnets = {}
|
|
|
|
out = []
|
|
for conditioning in [positive, negative]:
|
|
c = []
|
|
for t in conditioning:
|
|
d = t[1].copy()
|
|
|
|
prev_cnet = d.get('control', None)
|
|
if prev_cnet in cnets:
|
|
c_net = cnets[prev_cnet]
|
|
else:
|
|
c_net = control_net.copy().set_cond_hint(control_hint, strength, (start_percent, end_percent))
|
|
c_net.set_previous_controlnet(prev_cnet)
|
|
cnets[prev_cnet] = c_net
|
|
|
|
d['control'] = c_net
|
|
d['control_apply_to_uncond'] = False
|
|
n = [t[0], d]
|
|
c.append(n)
|
|
out.append(c)
|
|
return (out[0], out[1])
|
|
|
|
|
|
class UNETLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "unet_name": (folder_paths.get_filename_list("unet"), ),
|
|
}}
|
|
RETURN_TYPES = ("MODEL",)
|
|
FUNCTION = "load_unet"
|
|
|
|
CATEGORY = "advanced/loaders"
|
|
|
|
def load_unet(self, unet_name):
|
|
unet_path = folder_paths.get_full_path("unet", unet_name)
|
|
model = comfy.sd.load_unet(unet_path)
|
|
return (model,)
|
|
|
|
class CLIPLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "clip_name": (folder_paths.get_filename_list("clip"), ),
|
|
"type": (["stable_diffusion", "stable_cascade"], ),
|
|
}}
|
|
RETURN_TYPES = ("CLIP",)
|
|
FUNCTION = "load_clip"
|
|
|
|
CATEGORY = "advanced/loaders"
|
|
|
|
def load_clip(self, clip_name, type="stable_diffusion"):
|
|
clip_type = comfy.sd.CLIPType.STABLE_DIFFUSION
|
|
if type == "stable_cascade":
|
|
clip_type = comfy.sd.CLIPType.STABLE_CASCADE
|
|
|
|
clip_path = folder_paths.get_full_path("clip", clip_name)
|
|
clip = comfy.sd.load_clip(ckpt_paths=[clip_path], embedding_directory=folder_paths.get_folder_paths("embeddings"), clip_type=clip_type)
|
|
return (clip,)
|
|
|
|
class DualCLIPLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "clip_name1": (folder_paths.get_filename_list("clip"), ), "clip_name2": (folder_paths.get_filename_list("clip"), ),
|
|
}}
|
|
RETURN_TYPES = ("CLIP",)
|
|
FUNCTION = "load_clip"
|
|
|
|
CATEGORY = "advanced/loaders"
|
|
|
|
def load_clip(self, clip_name1, clip_name2):
|
|
clip_path1 = folder_paths.get_full_path("clip", clip_name1)
|
|
clip_path2 = folder_paths.get_full_path("clip", clip_name2)
|
|
clip = comfy.sd.load_clip(ckpt_paths=[clip_path1, clip_path2], embedding_directory=folder_paths.get_folder_paths("embeddings"))
|
|
return (clip,)
|
|
|
|
class CLIPVisionLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "clip_name": (folder_paths.get_filename_list("clip_vision"), ),
|
|
}}
|
|
RETURN_TYPES = ("CLIP_VISION",)
|
|
FUNCTION = "load_clip"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_clip(self, clip_name):
|
|
clip_path = folder_paths.get_full_path("clip_vision", clip_name)
|
|
clip_vision = comfy.clip_vision.load(clip_path)
|
|
return (clip_vision,)
|
|
|
|
class CLIPVisionEncode:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "clip_vision": ("CLIP_VISION",),
|
|
"image": ("IMAGE",)
|
|
}}
|
|
RETURN_TYPES = ("CLIP_VISION_OUTPUT",)
|
|
FUNCTION = "encode"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def encode(self, clip_vision, image):
|
|
output = clip_vision.encode_image(image)
|
|
return (output,)
|
|
|
|
class StyleModelLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "style_model_name": (folder_paths.get_filename_list("style_models"), )}}
|
|
|
|
RETURN_TYPES = ("STYLE_MODEL",)
|
|
FUNCTION = "load_style_model"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_style_model(self, style_model_name):
|
|
style_model_path = folder_paths.get_full_path("style_models", style_model_name)
|
|
style_model = comfy.sd.load_style_model(style_model_path)
|
|
return (style_model,)
|
|
|
|
|
|
class StyleModelApply:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", ),
|
|
"style_model": ("STYLE_MODEL", ),
|
|
"clip_vision_output": ("CLIP_VISION_OUTPUT", ),
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "apply_stylemodel"
|
|
|
|
CATEGORY = "conditioning/style_model"
|
|
|
|
def apply_stylemodel(self, clip_vision_output, style_model, conditioning):
|
|
cond = style_model.get_cond(clip_vision_output).flatten(start_dim=0, end_dim=1).unsqueeze(dim=0)
|
|
c = []
|
|
for t in conditioning:
|
|
n = [torch.cat((t[0], cond), dim=1), t[1].copy()]
|
|
c.append(n)
|
|
return (c, )
|
|
|
|
class unCLIPConditioning:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning": ("CONDITIONING", ),
|
|
"clip_vision_output": ("CLIP_VISION_OUTPUT", ),
|
|
"strength": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}),
|
|
"noise_augmentation": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.01}),
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "apply_adm"
|
|
|
|
CATEGORY = "conditioning"
|
|
|
|
def apply_adm(self, conditioning, clip_vision_output, strength, noise_augmentation):
|
|
if strength == 0:
|
|
return (conditioning, )
|
|
|
|
c = []
|
|
for t in conditioning:
|
|
o = t[1].copy()
|
|
x = {"clip_vision_output": clip_vision_output, "strength": strength, "noise_augmentation": noise_augmentation}
|
|
if "unclip_conditioning" in o:
|
|
o["unclip_conditioning"] = o["unclip_conditioning"][:] + [x]
|
|
else:
|
|
o["unclip_conditioning"] = [x]
|
|
n = [t[0], o]
|
|
c.append(n)
|
|
return (c, )
|
|
|
|
class GLIGENLoader:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "gligen_name": (folder_paths.get_filename_list("gligen"), )}}
|
|
|
|
RETURN_TYPES = ("GLIGEN",)
|
|
FUNCTION = "load_gligen"
|
|
|
|
CATEGORY = "loaders"
|
|
|
|
def load_gligen(self, gligen_name):
|
|
gligen_path = folder_paths.get_full_path("gligen", gligen_name)
|
|
gligen = comfy.sd.load_gligen(gligen_path)
|
|
return (gligen,)
|
|
|
|
class GLIGENTextBoxApply:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"conditioning_to": ("CONDITIONING", ),
|
|
"clip": ("CLIP", ),
|
|
"gligen_textbox_model": ("GLIGEN", ),
|
|
"text": ("STRING", {"multiline": True, "dynamicPrompts": True}),
|
|
"width": ("INT", {"default": 64, "min": 8, "max": MAX_RESOLUTION, "step": 8}),
|
|
"height": ("INT", {"default": 64, "min": 8, "max": MAX_RESOLUTION, "step": 8}),
|
|
"x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "append"
|
|
|
|
CATEGORY = "conditioning/gligen"
|
|
|
|
def append(self, conditioning_to, clip, gligen_textbox_model, text, width, height, x, y):
|
|
c = []
|
|
cond, cond_pooled = clip.encode_from_tokens(clip.tokenize(text), return_pooled="unprojected")
|
|
for t in conditioning_to:
|
|
n = [t[0], t[1].copy()]
|
|
position_params = [(cond_pooled, height // 8, width // 8, y // 8, x // 8)]
|
|
prev = []
|
|
if "gligen" in n[1]:
|
|
prev = n[1]['gligen'][2]
|
|
|
|
n[1]['gligen'] = ("position", gligen_textbox_model, prev + position_params)
|
|
c.append(n)
|
|
return (c, )
|
|
|
|
class EmptyLatentImage:
|
|
def __init__(self):
|
|
self.device = comfy.model_management.intermediate_device()
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "width": ("INT", {"default": 512, "min": 16, "max": MAX_RESOLUTION, "step": 8}),
|
|
"height": ("INT", {"default": 512, "min": 16, "max": MAX_RESOLUTION, "step": 8}),
|
|
"batch_size": ("INT", {"default": 1, "min": 1, "max": 4096})}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "generate"
|
|
|
|
CATEGORY = "latent"
|
|
|
|
def generate(self, width, height, batch_size=1):
|
|
latent = torch.zeros([batch_size, 4, height // 8, width // 8], device=self.device)
|
|
return ({"samples":latent}, )
|
|
|
|
|
|
class LatentFromBatch:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT",),
|
|
"batch_index": ("INT", {"default": 0, "min": 0, "max": 63}),
|
|
"length": ("INT", {"default": 1, "min": 1, "max": 64}),
|
|
}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "frombatch"
|
|
|
|
CATEGORY = "latent/batch"
|
|
|
|
def frombatch(self, samples, batch_index, length):
|
|
s = samples.copy()
|
|
s_in = samples["samples"]
|
|
batch_index = min(s_in.shape[0] - 1, batch_index)
|
|
length = min(s_in.shape[0] - batch_index, length)
|
|
s["samples"] = s_in[batch_index:batch_index + length].clone()
|
|
if "noise_mask" in samples:
|
|
masks = samples["noise_mask"]
|
|
if masks.shape[0] == 1:
|
|
s["noise_mask"] = masks.clone()
|
|
else:
|
|
if masks.shape[0] < s_in.shape[0]:
|
|
masks = masks.repeat(math.ceil(s_in.shape[0] / masks.shape[0]), 1, 1, 1)[:s_in.shape[0]]
|
|
s["noise_mask"] = masks[batch_index:batch_index + length].clone()
|
|
if "batch_index" not in s:
|
|
s["batch_index"] = [x for x in range(batch_index, batch_index+length)]
|
|
else:
|
|
s["batch_index"] = samples["batch_index"][batch_index:batch_index + length]
|
|
return (s,)
|
|
|
|
class RepeatLatentBatch:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT",),
|
|
"amount": ("INT", {"default": 1, "min": 1, "max": 64}),
|
|
}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "repeat"
|
|
|
|
CATEGORY = "latent/batch"
|
|
|
|
def repeat(self, samples, amount):
|
|
s = samples.copy()
|
|
s_in = samples["samples"]
|
|
|
|
s["samples"] = s_in.repeat((amount, 1,1,1))
|
|
if "noise_mask" in samples and samples["noise_mask"].shape[0] > 1:
|
|
masks = samples["noise_mask"]
|
|
if masks.shape[0] < s_in.shape[0]:
|
|
masks = masks.repeat(math.ceil(s_in.shape[0] / masks.shape[0]), 1, 1, 1)[:s_in.shape[0]]
|
|
s["noise_mask"] = samples["noise_mask"].repeat((amount, 1,1,1))
|
|
if "batch_index" in s:
|
|
offset = max(s["batch_index"]) - min(s["batch_index"]) + 1
|
|
s["batch_index"] = s["batch_index"] + [x + (i * offset) for i in range(1, amount) for x in s["batch_index"]]
|
|
return (s,)
|
|
|
|
class LatentUpscale:
|
|
upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "bislerp"]
|
|
crop_methods = ["disabled", "center"]
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT",), "upscale_method": (s.upscale_methods,),
|
|
"width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"crop": (s.crop_methods,)}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "upscale"
|
|
|
|
CATEGORY = "latent"
|
|
|
|
def upscale(self, samples, upscale_method, width, height, crop):
|
|
if width == 0 and height == 0:
|
|
s = samples
|
|
else:
|
|
s = samples.copy()
|
|
|
|
if width == 0:
|
|
height = max(64, height)
|
|
width = max(64, round(samples["samples"].shape[3] * height / samples["samples"].shape[2]))
|
|
elif height == 0:
|
|
width = max(64, width)
|
|
height = max(64, round(samples["samples"].shape[2] * width / samples["samples"].shape[3]))
|
|
else:
|
|
width = max(64, width)
|
|
height = max(64, height)
|
|
|
|
s["samples"] = comfy.utils.common_upscale(samples["samples"], width // 8, height // 8, upscale_method, crop)
|
|
return (s,)
|
|
|
|
class LatentUpscaleBy:
|
|
upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "bislerp"]
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT",), "upscale_method": (s.upscale_methods,),
|
|
"scale_by": ("FLOAT", {"default": 1.5, "min": 0.01, "max": 8.0, "step": 0.01}),}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "upscale"
|
|
|
|
CATEGORY = "latent"
|
|
|
|
def upscale(self, samples, upscale_method, scale_by):
|
|
s = samples.copy()
|
|
width = round(samples["samples"].shape[3] * scale_by)
|
|
height = round(samples["samples"].shape[2] * scale_by)
|
|
s["samples"] = comfy.utils.common_upscale(samples["samples"], width, height, upscale_method, "disabled")
|
|
return (s,)
|
|
|
|
class LatentRotate:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT",),
|
|
"rotation": (["none", "90 degrees", "180 degrees", "270 degrees"],),
|
|
}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "rotate"
|
|
|
|
CATEGORY = "latent/transform"
|
|
|
|
def rotate(self, samples, rotation):
|
|
s = samples.copy()
|
|
rotate_by = 0
|
|
if rotation.startswith("90"):
|
|
rotate_by = 1
|
|
elif rotation.startswith("180"):
|
|
rotate_by = 2
|
|
elif rotation.startswith("270"):
|
|
rotate_by = 3
|
|
|
|
s["samples"] = torch.rot90(samples["samples"], k=rotate_by, dims=[3, 2])
|
|
return (s,)
|
|
|
|
class LatentFlip:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT",),
|
|
"flip_method": (["x-axis: vertically", "y-axis: horizontally"],),
|
|
}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "flip"
|
|
|
|
CATEGORY = "latent/transform"
|
|
|
|
def flip(self, samples, flip_method):
|
|
s = samples.copy()
|
|
if flip_method.startswith("x"):
|
|
s["samples"] = torch.flip(samples["samples"], dims=[2])
|
|
elif flip_method.startswith("y"):
|
|
s["samples"] = torch.flip(samples["samples"], dims=[3])
|
|
|
|
return (s,)
|
|
|
|
class LatentComposite:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples_to": ("LATENT",),
|
|
"samples_from": ("LATENT",),
|
|
"x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"feather": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "composite"
|
|
|
|
CATEGORY = "latent"
|
|
|
|
def composite(self, samples_to, samples_from, x, y, composite_method="normal", feather=0):
|
|
x = x // 8
|
|
y = y // 8
|
|
feather = feather // 8
|
|
samples_out = samples_to.copy()
|
|
s = samples_to["samples"].clone()
|
|
samples_to = samples_to["samples"]
|
|
samples_from = samples_from["samples"]
|
|
if feather == 0:
|
|
s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x]
|
|
else:
|
|
samples_from = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x]
|
|
mask = torch.ones_like(samples_from)
|
|
for t in range(feather):
|
|
if y != 0:
|
|
mask[:,:,t:1+t,:] *= ((1.0/feather) * (t + 1))
|
|
|
|
if y + samples_from.shape[2] < samples_to.shape[2]:
|
|
mask[:,:,mask.shape[2] -1 -t: mask.shape[2]-t,:] *= ((1.0/feather) * (t + 1))
|
|
if x != 0:
|
|
mask[:,:,:,t:1+t] *= ((1.0/feather) * (t + 1))
|
|
if x + samples_from.shape[3] < samples_to.shape[3]:
|
|
mask[:,:,:,mask.shape[3]- 1 - t: mask.shape[3]- t] *= ((1.0/feather) * (t + 1))
|
|
rev_mask = torch.ones_like(mask) - mask
|
|
s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x] * mask + s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] * rev_mask
|
|
samples_out["samples"] = s
|
|
return (samples_out,)
|
|
|
|
class LatentBlend:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {
|
|
"samples1": ("LATENT",),
|
|
"samples2": ("LATENT",),
|
|
"blend_factor": ("FLOAT", {
|
|
"default": 0.5,
|
|
"min": 0,
|
|
"max": 1,
|
|
"step": 0.01
|
|
}),
|
|
}}
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "blend"
|
|
|
|
CATEGORY = "_for_testing"
|
|
|
|
def blend(self, samples1, samples2, blend_factor:float, blend_mode: str="normal"):
|
|
|
|
samples_out = samples1.copy()
|
|
samples1 = samples1["samples"]
|
|
samples2 = samples2["samples"]
|
|
|
|
if samples1.shape != samples2.shape:
|
|
samples2.permute(0, 3, 1, 2)
|
|
samples2 = comfy.utils.common_upscale(samples2, samples1.shape[3], samples1.shape[2], 'bicubic', crop='center')
|
|
samples2.permute(0, 2, 3, 1)
|
|
|
|
samples_blended = self.blend_mode(samples1, samples2, blend_mode)
|
|
samples_blended = samples1 * blend_factor + samples_blended * (1 - blend_factor)
|
|
samples_out["samples"] = samples_blended
|
|
return (samples_out,)
|
|
|
|
def blend_mode(self, img1, img2, mode):
|
|
if mode == "normal":
|
|
return img2
|
|
else:
|
|
raise ValueError(f"Unsupported blend mode: {mode}")
|
|
|
|
class LatentCrop:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT",),
|
|
"width": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 8}),
|
|
"height": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 8}),
|
|
"x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "crop"
|
|
|
|
CATEGORY = "latent/transform"
|
|
|
|
def crop(self, samples, width, height, x, y):
|
|
s = samples.copy()
|
|
samples = samples['samples']
|
|
x = x // 8
|
|
y = y // 8
|
|
|
|
#enfonce minimum size of 64
|
|
if x > (samples.shape[3] - 8):
|
|
x = samples.shape[3] - 8
|
|
if y > (samples.shape[2] - 8):
|
|
y = samples.shape[2] - 8
|
|
|
|
new_height = height // 8
|
|
new_width = width // 8
|
|
to_x = new_width + x
|
|
to_y = new_height + y
|
|
s['samples'] = samples[:,:,y:to_y, x:to_x]
|
|
return (s,)
|
|
|
|
class SetLatentNoiseMask:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT",),
|
|
"mask": ("MASK",),
|
|
}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "set_mask"
|
|
|
|
CATEGORY = "latent/inpaint"
|
|
|
|
def set_mask(self, samples, mask):
|
|
s = samples.copy()
|
|
s["noise_mask"] = mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1]))
|
|
return (s,)
|
|
|
|
def common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent, denoise=1.0, disable_noise=False, start_step=None, last_step=None, force_full_denoise=False):
|
|
latent_image = latent["samples"]
|
|
if disable_noise:
|
|
noise = torch.zeros(latent_image.size(), dtype=latent_image.dtype, layout=latent_image.layout, device="cpu")
|
|
else:
|
|
batch_inds = latent["batch_index"] if "batch_index" in latent else None
|
|
noise = comfy.sample.prepare_noise(latent_image, seed, batch_inds)
|
|
|
|
noise_mask = None
|
|
if "noise_mask" in latent:
|
|
noise_mask = latent["noise_mask"]
|
|
|
|
callback = latent_preview.prepare_callback(model, steps)
|
|
disable_pbar = not comfy.utils.PROGRESS_BAR_ENABLED
|
|
samples = comfy.sample.sample(model, noise, steps, cfg, sampler_name, scheduler, positive, negative, latent_image,
|
|
denoise=denoise, disable_noise=disable_noise, start_step=start_step, last_step=last_step,
|
|
force_full_denoise=force_full_denoise, noise_mask=noise_mask, callback=callback, disable_pbar=disable_pbar, seed=seed)
|
|
out = latent.copy()
|
|
out["samples"] = samples
|
|
return (out, )
|
|
|
|
class KSampler:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required":
|
|
{"model": ("MODEL",),
|
|
"seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
|
|
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
|
|
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.1, "round": 0.01}),
|
|
"sampler_name": (comfy.samplers.KSampler.SAMPLERS, ),
|
|
"scheduler": (comfy.samplers.KSampler.SCHEDULERS, ),
|
|
"positive": ("CONDITIONING", ),
|
|
"negative": ("CONDITIONING", ),
|
|
"latent_image": ("LATENT", ),
|
|
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}),
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "sample"
|
|
|
|
CATEGORY = "sampling"
|
|
|
|
def sample(self, model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=1.0):
|
|
return common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise)
|
|
|
|
class KSamplerAdvanced:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required":
|
|
{"model": ("MODEL",),
|
|
"add_noise": (["enable", "disable"], ),
|
|
"noise_seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
|
|
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
|
|
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.1, "round": 0.01}),
|
|
"sampler_name": (comfy.samplers.KSampler.SAMPLERS, ),
|
|
"scheduler": (comfy.samplers.KSampler.SCHEDULERS, ),
|
|
"positive": ("CONDITIONING", ),
|
|
"negative": ("CONDITIONING", ),
|
|
"latent_image": ("LATENT", ),
|
|
"start_at_step": ("INT", {"default": 0, "min": 0, "max": 10000}),
|
|
"end_at_step": ("INT", {"default": 10000, "min": 0, "max": 10000}),
|
|
"return_with_leftover_noise": (["disable", "enable"], ),
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "sample"
|
|
|
|
CATEGORY = "sampling"
|
|
|
|
def sample(self, model, add_noise, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, start_at_step, end_at_step, return_with_leftover_noise, denoise=1.0):
|
|
force_full_denoise = True
|
|
if return_with_leftover_noise == "enable":
|
|
force_full_denoise = False
|
|
disable_noise = False
|
|
if add_noise == "disable":
|
|
disable_noise = True
|
|
return common_ksampler(model, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise, disable_noise=disable_noise, start_step=start_at_step, last_step=end_at_step, force_full_denoise=force_full_denoise)
|
|
|
|
class SaveImage:
|
|
def __init__(self):
|
|
self.output_dir = folder_paths.get_output_directory()
|
|
self.type = "output"
|
|
self.prefix_append = ""
|
|
self.compress_level = 4
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required":
|
|
{"images": ("IMAGE", ),
|
|
"filename_prefix": ("STRING", {"default": "ComfyUI"})},
|
|
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"},
|
|
}
|
|
|
|
RETURN_TYPES = ()
|
|
FUNCTION = "save_images"
|
|
|
|
OUTPUT_NODE = True
|
|
|
|
CATEGORY = "image"
|
|
|
|
def save_images(self, images, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None):
|
|
filename_prefix += self.prefix_append
|
|
full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir, images[0].shape[1], images[0].shape[0])
|
|
results = list()
|
|
for (batch_number, image) in enumerate(images):
|
|
i = 255. * image.cpu().numpy()
|
|
img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8))
|
|
metadata = None
|
|
if not args.disable_metadata:
|
|
metadata = PngInfo()
|
|
if prompt is not None:
|
|
metadata.add_text("prompt", json.dumps(prompt))
|
|
if extra_pnginfo is not None:
|
|
for x in extra_pnginfo:
|
|
metadata.add_text(x, json.dumps(extra_pnginfo[x]))
|
|
|
|
filename_with_batch_num = filename.replace("%batch_num%", str(batch_number))
|
|
file = f"{filename_with_batch_num}_{counter:05}_.png"
|
|
img.save(os.path.join(full_output_folder, file), pnginfo=metadata, compress_level=self.compress_level)
|
|
results.append({
|
|
"filename": file,
|
|
"subfolder": subfolder,
|
|
"type": self.type
|
|
})
|
|
counter += 1
|
|
|
|
return { "ui": { "images": results } }
|
|
|
|
class PreviewImage(SaveImage):
|
|
def __init__(self):
|
|
self.output_dir = folder_paths.get_temp_directory()
|
|
self.type = "temp"
|
|
self.prefix_append = "_temp_" + ''.join(random.choice("abcdefghijklmnopqrstupvxyz") for x in range(5))
|
|
self.compress_level = 1
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required":
|
|
{"images": ("IMAGE", ), },
|
|
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"},
|
|
}
|
|
|
|
class LoadImage:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
input_dir = folder_paths.get_input_directory()
|
|
files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))]
|
|
return {"required":
|
|
{"image": (sorted(files), {"image_upload": True})},
|
|
}
|
|
|
|
CATEGORY = "image"
|
|
|
|
RETURN_TYPES = ("IMAGE", "MASK")
|
|
FUNCTION = "load_image"
|
|
def load_image(self, image):
|
|
image_path = folder_paths.get_annotated_filepath(image)
|
|
|
|
img = node_helpers.open_image(image_path)
|
|
|
|
output_images = []
|
|
output_masks = []
|
|
for i in ImageSequence.Iterator(img):
|
|
i = ImageOps.exif_transpose(i)
|
|
if i.mode == 'I':
|
|
i = i.point(lambda i: i * (1 / 255))
|
|
image = i.convert("RGB")
|
|
image = np.array(image).astype(np.float32) / 255.0
|
|
image = torch.from_numpy(image)[None,]
|
|
if 'A' in i.getbands():
|
|
mask = np.array(i.getchannel('A')).astype(np.float32) / 255.0
|
|
mask = 1. - torch.from_numpy(mask)
|
|
else:
|
|
mask = torch.zeros((64,64), dtype=torch.float32, device="cpu")
|
|
output_images.append(image)
|
|
output_masks.append(mask.unsqueeze(0))
|
|
|
|
if len(output_images) > 1:
|
|
output_image = torch.cat(output_images, dim=0)
|
|
output_mask = torch.cat(output_masks, dim=0)
|
|
else:
|
|
output_image = output_images[0]
|
|
output_mask = output_masks[0]
|
|
|
|
return (output_image, output_mask)
|
|
|
|
@classmethod
|
|
def IS_CHANGED(s, image):
|
|
image_path = folder_paths.get_annotated_filepath(image)
|
|
m = hashlib.sha256()
|
|
with open(image_path, 'rb') as f:
|
|
m.update(f.read())
|
|
return m.digest().hex()
|
|
|
|
@classmethod
|
|
def VALIDATE_INPUTS(s, image):
|
|
if not folder_paths.exists_annotated_filepath(image):
|
|
return "Invalid image file: {}".format(image)
|
|
|
|
return True
|
|
|
|
class LoadImageMask:
|
|
_color_channels = ["alpha", "red", "green", "blue"]
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
input_dir = folder_paths.get_input_directory()
|
|
files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))]
|
|
return {"required":
|
|
{"image": (sorted(files), {"image_upload": True}),
|
|
"channel": (s._color_channels, ), }
|
|
}
|
|
|
|
CATEGORY = "mask"
|
|
|
|
RETURN_TYPES = ("MASK",)
|
|
FUNCTION = "load_image"
|
|
def load_image(self, image, channel):
|
|
image_path = folder_paths.get_annotated_filepath(image)
|
|
i = Image.open(image_path)
|
|
i = ImageOps.exif_transpose(i)
|
|
if i.getbands() != ("R", "G", "B", "A"):
|
|
if i.mode == 'I':
|
|
i = i.point(lambda i: i * (1 / 255))
|
|
i = i.convert("RGBA")
|
|
mask = None
|
|
c = channel[0].upper()
|
|
if c in i.getbands():
|
|
mask = np.array(i.getchannel(c)).astype(np.float32) / 255.0
|
|
mask = torch.from_numpy(mask)
|
|
if c == 'A':
|
|
mask = 1. - mask
|
|
else:
|
|
mask = torch.zeros((64,64), dtype=torch.float32, device="cpu")
|
|
return (mask.unsqueeze(0),)
|
|
|
|
@classmethod
|
|
def IS_CHANGED(s, image, channel):
|
|
image_path = folder_paths.get_annotated_filepath(image)
|
|
m = hashlib.sha256()
|
|
with open(image_path, 'rb') as f:
|
|
m.update(f.read())
|
|
return m.digest().hex()
|
|
|
|
@classmethod
|
|
def VALIDATE_INPUTS(s, image):
|
|
if not folder_paths.exists_annotated_filepath(image):
|
|
return "Invalid image file: {}".format(image)
|
|
|
|
return True
|
|
|
|
class ImageScale:
|
|
upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "lanczos"]
|
|
crop_methods = ["disabled", "center"]
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "image": ("IMAGE",), "upscale_method": (s.upscale_methods,),
|
|
"width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1}),
|
|
"height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1}),
|
|
"crop": (s.crop_methods,)}}
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "upscale"
|
|
|
|
CATEGORY = "image/upscaling"
|
|
|
|
def upscale(self, image, upscale_method, width, height, crop):
|
|
if width == 0 and height == 0:
|
|
s = image
|
|
else:
|
|
samples = image.movedim(-1,1)
|
|
|
|
if width == 0:
|
|
width = max(1, round(samples.shape[3] * height / samples.shape[2]))
|
|
elif height == 0:
|
|
height = max(1, round(samples.shape[2] * width / samples.shape[3]))
|
|
|
|
s = comfy.utils.common_upscale(samples, width, height, upscale_method, crop)
|
|
s = s.movedim(1,-1)
|
|
return (s,)
|
|
|
|
class ImageScaleBy:
|
|
upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "lanczos"]
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "image": ("IMAGE",), "upscale_method": (s.upscale_methods,),
|
|
"scale_by": ("FLOAT", {"default": 1.0, "min": 0.01, "max": 8.0, "step": 0.01}),}}
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "upscale"
|
|
|
|
CATEGORY = "image/upscaling"
|
|
|
|
def upscale(self, image, upscale_method, scale_by):
|
|
samples = image.movedim(-1,1)
|
|
width = round(samples.shape[3] * scale_by)
|
|
height = round(samples.shape[2] * scale_by)
|
|
s = comfy.utils.common_upscale(samples, width, height, upscale_method, "disabled")
|
|
s = s.movedim(1,-1)
|
|
return (s,)
|
|
|
|
class ImageInvert:
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "image": ("IMAGE",)}}
|
|
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "invert"
|
|
|
|
CATEGORY = "image"
|
|
|
|
def invert(self, image):
|
|
s = 1.0 - image
|
|
return (s,)
|
|
|
|
class ImageBatch:
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "image1": ("IMAGE",), "image2": ("IMAGE",)}}
|
|
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "batch"
|
|
|
|
CATEGORY = "image"
|
|
|
|
def batch(self, image1, image2):
|
|
if image1.shape[1:] != image2.shape[1:]:
|
|
image2 = comfy.utils.common_upscale(image2.movedim(-1,1), image1.shape[2], image1.shape[1], "bilinear", "center").movedim(1,-1)
|
|
s = torch.cat((image1, image2), dim=0)
|
|
return (s,)
|
|
|
|
class EmptyImage:
|
|
def __init__(self, device="cpu"):
|
|
self.device = device
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "width": ("INT", {"default": 512, "min": 1, "max": MAX_RESOLUTION, "step": 1}),
|
|
"height": ("INT", {"default": 512, "min": 1, "max": MAX_RESOLUTION, "step": 1}),
|
|
"batch_size": ("INT", {"default": 1, "min": 1, "max": 4096}),
|
|
"color": ("INT", {"default": 0, "min": 0, "max": 0xFFFFFF, "step": 1, "display": "color"}),
|
|
}}
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "generate"
|
|
|
|
CATEGORY = "image"
|
|
|
|
def generate(self, width, height, batch_size=1, color=0):
|
|
r = torch.full([batch_size, height, width, 1], ((color >> 16) & 0xFF) / 0xFF)
|
|
g = torch.full([batch_size, height, width, 1], ((color >> 8) & 0xFF) / 0xFF)
|
|
b = torch.full([batch_size, height, width, 1], ((color) & 0xFF) / 0xFF)
|
|
return (torch.cat((r, g, b), dim=-1), )
|
|
|
|
class ImagePadForOutpaint:
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {
|
|
"required": {
|
|
"image": ("IMAGE",),
|
|
"left": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"top": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"right": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"bottom": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
|
|
"feathering": ("INT", {"default": 40, "min": 0, "max": MAX_RESOLUTION, "step": 1}),
|
|
}
|
|
}
|
|
|
|
RETURN_TYPES = ("IMAGE", "MASK")
|
|
FUNCTION = "expand_image"
|
|
|
|
CATEGORY = "image"
|
|
|
|
def expand_image(self, image, left, top, right, bottom, feathering):
|
|
d1, d2, d3, d4 = image.size()
|
|
|
|
new_image = torch.ones(
|
|
(d1, d2 + top + bottom, d3 + left + right, d4),
|
|
dtype=torch.float32,
|
|
) * 0.5
|
|
|
|
new_image[:, top:top + d2, left:left + d3, :] = image
|
|
|
|
mask = torch.ones(
|
|
(d2 + top + bottom, d3 + left + right),
|
|
dtype=torch.float32,
|
|
)
|
|
|
|
t = torch.zeros(
|
|
(d2, d3),
|
|
dtype=torch.float32
|
|
)
|
|
|
|
if feathering > 0 and feathering * 2 < d2 and feathering * 2 < d3:
|
|
|
|
for i in range(d2):
|
|
for j in range(d3):
|
|
dt = i if top != 0 else d2
|
|
db = d2 - i if bottom != 0 else d2
|
|
|
|
dl = j if left != 0 else d3
|
|
dr = d3 - j if right != 0 else d3
|
|
|
|
d = min(dt, db, dl, dr)
|
|
|
|
if d >= feathering:
|
|
continue
|
|
|
|
v = (feathering - d) / feathering
|
|
|
|
t[i, j] = v * v
|
|
|
|
mask[top:top + d2, left:left + d3] = t
|
|
|
|
return (new_image, mask)
|
|
|
|
|
|
NODE_CLASS_MAPPINGS = {
|
|
"KSampler": KSampler,
|
|
"CheckpointLoaderSimple": CheckpointLoaderSimple,
|
|
"CLIPTextEncode": CLIPTextEncode,
|
|
"CLIPSetLastLayer": CLIPSetLastLayer,
|
|
"VAEDecode": VAEDecode,
|
|
"VAEEncode": VAEEncode,
|
|
"VAEEncodeForInpaint": VAEEncodeForInpaint,
|
|
"VAELoader": VAELoader,
|
|
"EmptyLatentImage": EmptyLatentImage,
|
|
"LatentUpscale": LatentUpscale,
|
|
"LatentUpscaleBy": LatentUpscaleBy,
|
|
"LatentFromBatch": LatentFromBatch,
|
|
"RepeatLatentBatch": RepeatLatentBatch,
|
|
"SaveImage": SaveImage,
|
|
"PreviewImage": PreviewImage,
|
|
"LoadImage": LoadImage,
|
|
"LoadImageMask": LoadImageMask,
|
|
"ImageScale": ImageScale,
|
|
"ImageScaleBy": ImageScaleBy,
|
|
"ImageInvert": ImageInvert,
|
|
"ImageBatch": ImageBatch,
|
|
"ImagePadForOutpaint": ImagePadForOutpaint,
|
|
"EmptyImage": EmptyImage,
|
|
"ConditioningAverage": ConditioningAverage ,
|
|
"ConditioningCombine": ConditioningCombine,
|
|
"ConditioningConcat": ConditioningConcat,
|
|
"ConditioningSetArea": ConditioningSetArea,
|
|
"ConditioningSetAreaPercentage": ConditioningSetAreaPercentage,
|
|
"ConditioningSetAreaStrength": ConditioningSetAreaStrength,
|
|
"ConditioningSetMask": ConditioningSetMask,
|
|
"KSamplerAdvanced": KSamplerAdvanced,
|
|
"SetLatentNoiseMask": SetLatentNoiseMask,
|
|
"LatentComposite": LatentComposite,
|
|
"LatentBlend": LatentBlend,
|
|
"LatentRotate": LatentRotate,
|
|
"LatentFlip": LatentFlip,
|
|
"LatentCrop": LatentCrop,
|
|
"LoraLoader": LoraLoader,
|
|
"CLIPLoader": CLIPLoader,
|
|
"UNETLoader": UNETLoader,
|
|
"DualCLIPLoader": DualCLIPLoader,
|
|
"CLIPVisionEncode": CLIPVisionEncode,
|
|
"StyleModelApply": StyleModelApply,
|
|
"unCLIPConditioning": unCLIPConditioning,
|
|
"ControlNetApply": ControlNetApply,
|
|
"ControlNetApplyAdvanced": ControlNetApplyAdvanced,
|
|
"ControlNetLoader": ControlNetLoader,
|
|
"DiffControlNetLoader": DiffControlNetLoader,
|
|
"StyleModelLoader": StyleModelLoader,
|
|
"CLIPVisionLoader": CLIPVisionLoader,
|
|
"VAEDecodeTiled": VAEDecodeTiled,
|
|
"VAEEncodeTiled": VAEEncodeTiled,
|
|
"unCLIPCheckpointLoader": unCLIPCheckpointLoader,
|
|
"GLIGENLoader": GLIGENLoader,
|
|
"GLIGENTextBoxApply": GLIGENTextBoxApply,
|
|
"InpaintModelConditioning": InpaintModelConditioning,
|
|
|
|
"CheckpointLoader": CheckpointLoader,
|
|
"DiffusersLoader": DiffusersLoader,
|
|
|
|
"LoadLatent": LoadLatent,
|
|
"SaveLatent": SaveLatent,
|
|
|
|
"ConditioningZeroOut": ConditioningZeroOut,
|
|
"ConditioningSetTimestepRange": ConditioningSetTimestepRange,
|
|
"LoraLoaderModelOnly": LoraLoaderModelOnly,
|
|
}
|
|
|
|
NODE_DISPLAY_NAME_MAPPINGS = {
|
|
# Sampling
|
|
"KSampler": "KSampler",
|
|
"KSamplerAdvanced": "KSampler (Advanced)",
|
|
# Loaders
|
|
"CheckpointLoader": "Load Checkpoint With Config (DEPRECATED)",
|
|
"CheckpointLoaderSimple": "Load Checkpoint",
|
|
"VAELoader": "Load VAE",
|
|
"LoraLoader": "Load LoRA",
|
|
"CLIPLoader": "Load CLIP",
|
|
"ControlNetLoader": "Load ControlNet Model",
|
|
"DiffControlNetLoader": "Load ControlNet Model (diff)",
|
|
"StyleModelLoader": "Load Style Model",
|
|
"CLIPVisionLoader": "Load CLIP Vision",
|
|
"UpscaleModelLoader": "Load Upscale Model",
|
|
# Conditioning
|
|
"CLIPVisionEncode": "CLIP Vision Encode",
|
|
"StyleModelApply": "Apply Style Model",
|
|
"CLIPTextEncode": "CLIP Text Encode (Prompt)",
|
|
"CLIPSetLastLayer": "CLIP Set Last Layer",
|
|
"ConditioningCombine": "Conditioning (Combine)",
|
|
"ConditioningAverage ": "Conditioning (Average)",
|
|
"ConditioningConcat": "Conditioning (Concat)",
|
|
"ConditioningSetArea": "Conditioning (Set Area)",
|
|
"ConditioningSetAreaPercentage": "Conditioning (Set Area with Percentage)",
|
|
"ConditioningSetMask": "Conditioning (Set Mask)",
|
|
"ControlNetApply": "Apply ControlNet",
|
|
"ControlNetApplyAdvanced": "Apply ControlNet (Advanced)",
|
|
# Latent
|
|
"VAEEncodeForInpaint": "VAE Encode (for Inpainting)",
|
|
"SetLatentNoiseMask": "Set Latent Noise Mask",
|
|
"VAEDecode": "VAE Decode",
|
|
"VAEEncode": "VAE Encode",
|
|
"LatentRotate": "Rotate Latent",
|
|
"LatentFlip": "Flip Latent",
|
|
"LatentCrop": "Crop Latent",
|
|
"EmptyLatentImage": "Empty Latent Image",
|
|
"LatentUpscale": "Upscale Latent",
|
|
"LatentUpscaleBy": "Upscale Latent By",
|
|
"LatentComposite": "Latent Composite",
|
|
"LatentBlend": "Latent Blend",
|
|
"LatentFromBatch" : "Latent From Batch",
|
|
"RepeatLatentBatch": "Repeat Latent Batch",
|
|
# Image
|
|
"SaveImage": "Save Image",
|
|
"PreviewImage": "Preview Image",
|
|
"LoadImage": "Load Image",
|
|
"LoadImageMask": "Load Image (as Mask)",
|
|
"ImageScale": "Upscale Image",
|
|
"ImageScaleBy": "Upscale Image By",
|
|
"ImageUpscaleWithModel": "Upscale Image (using Model)",
|
|
"ImageInvert": "Invert Image",
|
|
"ImagePadForOutpaint": "Pad Image for Outpainting",
|
|
"ImageBatch": "Batch Images",
|
|
# _for_testing
|
|
"VAEDecodeTiled": "VAE Decode (Tiled)",
|
|
"VAEEncodeTiled": "VAE Encode (Tiled)",
|
|
}
|
|
|
|
EXTENSION_WEB_DIRS = {}
|
|
|
|
def load_custom_node(module_path, ignore=set()):
|
|
module_name = os.path.basename(module_path)
|
|
if os.path.isfile(module_path):
|
|
sp = os.path.splitext(module_path)
|
|
module_name = sp[0]
|
|
try:
|
|
logging.debug("Trying to load custom node {}".format(module_path))
|
|
if os.path.isfile(module_path):
|
|
module_spec = importlib.util.spec_from_file_location(module_name, module_path)
|
|
module_dir = os.path.split(module_path)[0]
|
|
else:
|
|
module_spec = importlib.util.spec_from_file_location(module_name, os.path.join(module_path, "__init__.py"))
|
|
module_dir = module_path
|
|
|
|
module = importlib.util.module_from_spec(module_spec)
|
|
sys.modules[module_name] = module
|
|
module_spec.loader.exec_module(module)
|
|
|
|
if hasattr(module, "WEB_DIRECTORY") and getattr(module, "WEB_DIRECTORY") is not None:
|
|
web_dir = os.path.abspath(os.path.join(module_dir, getattr(module, "WEB_DIRECTORY")))
|
|
if os.path.isdir(web_dir):
|
|
EXTENSION_WEB_DIRS[module_name] = web_dir
|
|
|
|
if hasattr(module, "NODE_CLASS_MAPPINGS") and getattr(module, "NODE_CLASS_MAPPINGS") is not None:
|
|
for name in module.NODE_CLASS_MAPPINGS:
|
|
if name not in ignore:
|
|
NODE_CLASS_MAPPINGS[name] = module.NODE_CLASS_MAPPINGS[name]
|
|
if hasattr(module, "NODE_DISPLAY_NAME_MAPPINGS") and getattr(module, "NODE_DISPLAY_NAME_MAPPINGS") is not None:
|
|
NODE_DISPLAY_NAME_MAPPINGS.update(module.NODE_DISPLAY_NAME_MAPPINGS)
|
|
return True
|
|
else:
|
|
logging.warning(f"Skip {module_path} module for custom nodes due to the lack of NODE_CLASS_MAPPINGS.")
|
|
return False
|
|
except Exception as e:
|
|
logging.warning(traceback.format_exc())
|
|
logging.warning(f"Cannot import {module_path} module for custom nodes: {e}")
|
|
return False
|
|
|
|
def load_custom_nodes():
|
|
base_node_names = set(NODE_CLASS_MAPPINGS.keys())
|
|
node_paths = folder_paths.get_folder_paths("custom_nodes")
|
|
node_import_times = []
|
|
for custom_node_path in node_paths:
|
|
possible_modules = os.listdir(os.path.realpath(custom_node_path))
|
|
if "__pycache__" in possible_modules:
|
|
possible_modules.remove("__pycache__")
|
|
|
|
for possible_module in possible_modules:
|
|
module_path = os.path.join(custom_node_path, possible_module)
|
|
if os.path.isfile(module_path) and os.path.splitext(module_path)[1] != ".py": continue
|
|
if module_path.endswith(".disabled"): continue
|
|
time_before = time.perf_counter()
|
|
success = load_custom_node(module_path, base_node_names)
|
|
node_import_times.append((time.perf_counter() - time_before, module_path, success))
|
|
|
|
if len(node_import_times) > 0:
|
|
logging.info("\nImport times for custom nodes:")
|
|
for n in sorted(node_import_times):
|
|
if n[2]:
|
|
import_message = ""
|
|
else:
|
|
import_message = " (IMPORT FAILED)"
|
|
logging.info("{:6.1f} seconds{}: {}".format(n[0], import_message, n[1]))
|
|
logging.info("")
|
|
|
|
def init_custom_nodes():
|
|
extras_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "comfy_extras")
|
|
extras_files = [
|
|
"nodes_latent.py",
|
|
"nodes_hypernetwork.py",
|
|
"nodes_upscale_model.py",
|
|
"nodes_post_processing.py",
|
|
"nodes_mask.py",
|
|
"nodes_compositing.py",
|
|
"nodes_rebatch.py",
|
|
"nodes_model_merging.py",
|
|
"nodes_tomesd.py",
|
|
"nodes_clip_sdxl.py",
|
|
"nodes_canny.py",
|
|
"nodes_freelunch.py",
|
|
"nodes_custom_sampler.py",
|
|
"nodes_hypertile.py",
|
|
"nodes_model_advanced.py",
|
|
"nodes_model_downscale.py",
|
|
"nodes_images.py",
|
|
"nodes_video_model.py",
|
|
"nodes_sag.py",
|
|
"nodes_perpneg.py",
|
|
"nodes_stable3d.py",
|
|
"nodes_sdupscale.py",
|
|
"nodes_photomaker.py",
|
|
"nodes_cond.py",
|
|
"nodes_morphology.py",
|
|
"nodes_stable_cascade.py",
|
|
"nodes_differential_diffusion.py",
|
|
"nodes_ip2p.py",
|
|
"nodes_model_merging_model_specific.py",
|
|
"nodes_pag.py",
|
|
"nodes_align_your_steps.py",
|
|
"nodes_attention_multiply.py",
|
|
"nodes_advanced_samplers.py",
|
|
]
|
|
|
|
import_failed = []
|
|
for node_file in extras_files:
|
|
if not load_custom_node(os.path.join(extras_dir, node_file)):
|
|
import_failed.append(node_file)
|
|
|
|
load_custom_nodes()
|
|
|
|
if len(import_failed) > 0:
|
|
logging.warning("WARNING: some comfy_extras/ nodes did not import correctly. This may be because they are missing some dependencies.\n")
|
|
for node in import_failed:
|
|
logging.warning("IMPORT FAILED: {}".format(node))
|
|
logging.warning("\nThis issue might be caused by new missing dependencies added the last time you updated ComfyUI.")
|
|
if args.windows_standalone_build:
|
|
logging.warning("Please run the update script: update/update_comfyui.bat")
|
|
else:
|
|
logging.warning("Please do a: pip install -r requirements.txt")
|
|
logging.warning("")
|