ComfyUI/nodes.py
Robin Huang f58f0f5696
More API nodes: Gemini/Open AI Chat, Tripo, Rodin, Runway Image (#8295)
* Add Ideogram generate node.

* Add staging api.

* Add API_NODE and common error for missing auth token (#5)

* Add Minimax Video Generation + Async Task queue polling example (#6)

* [Minimax] Show video preview and embed workflow in ouput (#7)

* Remove uv.lock

* Remove polling operations.

* Revert "Remove polling operations."

This reverts commit 8415404ce8fbc0262b7de54fc700c5c8854a34fc.

* Update stubs.

* Added Ideogram and Minimax back in.

* Added initial BFL Flux 1.1 [pro] Ultra node (#11)

* Manually add BFL polling status response schema (#15)

* Add function for uploading files. (#18)

* Add Luma nodes (#16)

Co-authored-by: Robin Huang <robin.j.huang@gmail.com>

* Refactor util functions (#20)

* Add rest of Luma node functionality (#19)

Co-authored-by: Robin Huang <robin.j.huang@gmail.com>

* Fix image_luma_ref not working (#28)

Co-authored-by: Robin Huang <robin.j.huang@gmail.com>

* [Bug] Remove duplicated option T2V-01 in MinimaxTextToVideoNode (#31)

* add veo2, bump av req (#32)

* Add Recraft nodes (#29)

* Add Kling Nodes (#12)

* Add Camera Concepts (luma_concepts) to Luma Video nodes (#33)

Co-authored-by: Robin Huang <robin.j.huang@gmail.com>

* Add Runway nodes (#17)

* Convert Minimax node to use VIDEO output type (#34)

* Standard `CATEGORY` system for api nodes (#35)

* Set `Content-Type` header when uploading files (#36)

* add better error propagation to veo2 (#37)

* Add Realistic Image and Logo Raster styles for Recraft v3 (#38)

* Fix runway image upload and progress polling (#39)

* Fix image upload for Luma: only include `Content-Type` header field if it's set explicitly (#40)

* Moved Luma nodes to nodes_luma.py (#47)

* Moved Recraft nodes to nodes_recraft.py (#48)

* Move and fix BFL nodes to node_bfl.py (#49)

* Move and edit Minimax node to nodes_minimax.py (#50)

* Add Recraft Text to Vector node, add Save SVG node to handle its output (#53)

* Added pixverse_template support to Pixverse Text to Video node (#54)

* Added Recraft Controls + Recraft Color RGB nodes (#57)

* split remaining nodes out of nodes_api, make utility lib, refactor ideogram (#61)

* Set request type explicitly (#66)

* Add `control_after_generate` to all seed inputs (#69)

* Fix bug: deleting `Content-Type` when property does not exist (#73)

* Add Pixverse and updated Kling types (#75)

* Added Recraft Style - Infinite Style Library node (#82)

* add ideogram v3 (#83)

* [Kling] Split Camera Control config to its own node (#81)

* Add Pika i2v and t2v nodes (#52)

* Remove Runway nodes (#88)

* Fix: Prompt text can't be validated in Kling nodes when using primitive nodes (#90)

* Update Pika Duration and Resolution options (#94)

* Removed Infinite Style Library until later (#99)

* fix multi image return (#101)

close #96

* Serve SVG files directly (#107)

* Add a bunch of nodes, 3 ready to use, the rest waiting for endpoint support (#108)

* Revert "Serve SVG files directly" (#111)

* Expose 4 remaining Recraft nodes (#112)

* [Kling] Add `Duration` and `Video ID` outputs (#105)

* Add Kling nodes: camera control, start-end frame, lip-sync, video extend (#115)

* Fix error for Recraft ImageToImage error for nonexistent random_seed param (#118)

* Add remaining Pika nodes (#119)

* Make controls input work for Recraft Image to Image node (#120)

* Fix: Nested `AnyUrl` in request model cannot be serialized (Kling, Runway) (#129)

* Show errors and API output URLs to the user (change log levels) (#131)

* Apply small fixes and most prompt validation (if needed to avoid API error) (#135)

* Node name/category modifications (#140)

* Add back Recraft Style - Infinite Style Library node (#141)

* [Kling] Fix: Correct/verify supported subset of input combos in Kling nodes (#149)

* Remove pixverse_template from PixVerse Transition Video node (#155)

* Use 3.9 compat syntax (#164)

* Handle Comfy API key based authorizaton (#167)

Co-authored-by: Jedrzej Kosinski <kosinkadink1@gmail.com>

* [BFL] Print download URL of successful task result directly on nodes (#175)

* Show output URL and progress text on Pika nodes (#168)

* [Ideogram] Print download URL of successful task result directly on nodes (#176)

* [Kling] Print download URL of successful task result directly on nodes (#181)

* Merge upstream may 14 25 (#186)

Co-authored-by: comfyanonymous <comfyanonymous@protonmail.com>
Co-authored-by: AustinMroz <austinmroz@utexas.edu>
Co-authored-by: comfyanonymous <121283862+comfyanonymous@users.noreply.github.com>
Co-authored-by: Benjamin Lu <benceruleanlu@proton.me>
Co-authored-by: Andrew Kvochko <kvochko@users.noreply.github.com>
Co-authored-by: Pam <42671363+pamparamm@users.noreply.github.com>
Co-authored-by: chaObserv <154517000+chaObserv@users.noreply.github.com>
Co-authored-by: Yoland Yan <4950057+yoland68@users.noreply.github.com>
Co-authored-by: guill <guill@users.noreply.github.com>
Co-authored-by: Chenlei Hu <hcl@comfy.org>
Co-authored-by: Terry Jia <terryjia88@gmail.com>
Co-authored-by: Silver <65376327+silveroxides@users.noreply.github.com>
Co-authored-by: catboxanon <122327233+catboxanon@users.noreply.github.com>
Co-authored-by: liesen <liesen.dev@gmail.com>
Co-authored-by: Kohaku-Blueleaf <59680068+KohakuBlueleaf@users.noreply.github.com>
Co-authored-by: Jedrzej Kosinski <kosinkadink1@gmail.com>
Co-authored-by: Robin Huang <robin.j.huang@gmail.com>
Co-authored-by: thot experiment <94414189+thot-experiment@users.noreply.github.com>
Co-authored-by: blepping <157360029+blepping@users.noreply.github.com>

* Update instructions on how to develop API Nodes. (#171)

* Add Runway FLF and I2V nodes (#187)

* Add OpenAI chat node (#188)

* Update README.

* Add Google Gemini API node (#191)

* Add Runway Gen 4 Text to Image Node (#193)

* [Runway, Gemini] Update node display names and attributes (#194)

* Update path from "image-to-video" to "image_to_video" (#197)

* [Runway] Split I2V nodes into separate gen3 and gen4 nodes (#198)

* Update runway i2v ratio enum (#201)

* Rodin3D: implement Rodin3D API Nodes (#190)

Co-authored-by: WhiteGiven <c15838568211@163.com>
Co-authored-by: Robin Huang <robin.j.huang@gmail.com>

* Add Tripo Nodes. (#189)

Co-authored-by: Robin Huang <robin.j.huang@gmail.com>

* Change casing of categories "3D"  => "3d" (#208)

* [tripo] fix negtive_prompt and mv2model (#212)

* [tripo] set default param to None (#215)

* Add description and tooltip to Tripo Refine model. (#218)

* Update.

* Fix rebase errors.

* Fix rebase errors.

* Update templates.

* Bump frontend.

* Add file type info for file inputs.

---------

Co-authored-by: Christian Byrne <cbyrne@comfy.org>
Co-authored-by: Jedrzej Kosinski <kosinkadink1@gmail.com>
Co-authored-by: Chenlei Hu <hcl@comfy.org>
Co-authored-by: thot experiment <94414189+thot-experiment@users.noreply.github.com>
Co-authored-by: comfyanonymous <comfyanonymous@protonmail.com>
Co-authored-by: AustinMroz <austinmroz@utexas.edu>
Co-authored-by: comfyanonymous <121283862+comfyanonymous@users.noreply.github.com>
Co-authored-by: Benjamin Lu <benceruleanlu@proton.me>
Co-authored-by: Andrew Kvochko <kvochko@users.noreply.github.com>
Co-authored-by: Pam <42671363+pamparamm@users.noreply.github.com>
Co-authored-by: chaObserv <154517000+chaObserv@users.noreply.github.com>
Co-authored-by: Yoland Yan <4950057+yoland68@users.noreply.github.com>
Co-authored-by: guill <guill@users.noreply.github.com>
Co-authored-by: Terry Jia <terryjia88@gmail.com>
Co-authored-by: Silver <65376327+silveroxides@users.noreply.github.com>
Co-authored-by: catboxanon <122327233+catboxanon@users.noreply.github.com>
Co-authored-by: liesen <liesen.dev@gmail.com>
Co-authored-by: Kohaku-Blueleaf <59680068+KohakuBlueleaf@users.noreply.github.com>
Co-authored-by: blepping <157360029+blepping@users.noreply.github.com>
Co-authored-by: Changrz <51637999+WhiteGiven@users.noreply.github.com>
Co-authored-by: WhiteGiven <c15838568211@163.com>
Co-authored-by: seed93 <liangding1990@163.com>
2025-05-27 03:00:58 -04:00

2336 lines
92 KiB
Python

from __future__ import annotations
import torch
import os
import sys
import json
import hashlib
import traceback
import math
import time
import random
import logging
from PIL import Image, ImageOps, ImageSequence
from PIL.PngImagePlugin import PngInfo
import numpy as np
import safetensors.torch
sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), "comfy"))
import comfy.diffusers_load
import comfy.samplers
import comfy.sample
import comfy.sd
import comfy.utils
import comfy.controlnet
from comfy.comfy_types import IO, ComfyNodeABC, InputTypeDict, FileLocator
import comfy.clip_vision
import comfy.model_management
from comfy.cli_args import args
import importlib
import folder_paths
import latent_preview
import node_helpers
def before_node_execution():
comfy.model_management.throw_exception_if_processing_interrupted()
def interrupt_processing(value=True):
comfy.model_management.interrupt_current_processing(value)
MAX_RESOLUTION=16384
class CLIPTextEncode(ComfyNodeABC):
@classmethod
def INPUT_TYPES(s) -> InputTypeDict:
return {
"required": {
"text": (IO.STRING, {"multiline": True, "dynamicPrompts": True, "tooltip": "The text to be encoded."}),
"clip": (IO.CLIP, {"tooltip": "The CLIP model used for encoding the text."})
}
}
RETURN_TYPES = (IO.CONDITIONING,)
OUTPUT_TOOLTIPS = ("A conditioning containing the embedded text used to guide the diffusion model.",)
FUNCTION = "encode"
CATEGORY = "conditioning"
DESCRIPTION = "Encodes a text prompt using a CLIP model into an embedding that can be used to guide the diffusion model towards generating specific images."
def encode(self, clip, text):
if clip is None:
raise RuntimeError("ERROR: clip input is invalid: None\n\nIf the clip is from a checkpoint loader node your checkpoint does not contain a valid clip or text encoder model.")
tokens = clip.tokenize(text)
return (clip.encode_from_tokens_scheduled(tokens), )
class ConditioningCombine:
@classmethod
def INPUT_TYPES(s):
return {"required": {"conditioning_1": ("CONDITIONING", ), "conditioning_2": ("CONDITIONING", )}}
RETURN_TYPES = ("CONDITIONING",)
FUNCTION = "combine"
CATEGORY = "conditioning"
def combine(self, conditioning_1, conditioning_2):
return (conditioning_1 + conditioning_2, )
class ConditioningAverage :
@classmethod
def INPUT_TYPES(s):
return {"required": {"conditioning_to": ("CONDITIONING", ), "conditioning_from": ("CONDITIONING", ),
"conditioning_to_strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01})
}}
RETURN_TYPES = ("CONDITIONING",)
FUNCTION = "addWeighted"
CATEGORY = "conditioning"
def addWeighted(self, conditioning_to, conditioning_from, conditioning_to_strength):
out = []
if len(conditioning_from) > 1:
logging.warning("Warning: ConditioningAverage conditioning_from contains more than 1 cond, only the first one will actually be applied to conditioning_to.")
cond_from = conditioning_from[0][0]
pooled_output_from = conditioning_from[0][1].get("pooled_output", None)
for i in range(len(conditioning_to)):
t1 = conditioning_to[i][0]
pooled_output_to = conditioning_to[i][1].get("pooled_output", pooled_output_from)
t0 = cond_from[:,:t1.shape[1]]
if t0.shape[1] < t1.shape[1]:
t0 = torch.cat([t0] + [torch.zeros((1, (t1.shape[1] - t0.shape[1]), t1.shape[2]))], dim=1)
tw = torch.mul(t1, conditioning_to_strength) + torch.mul(t0, (1.0 - conditioning_to_strength))
t_to = conditioning_to[i][1].copy()
if pooled_output_from is not None and pooled_output_to is not None:
t_to["pooled_output"] = torch.mul(pooled_output_to, conditioning_to_strength) + torch.mul(pooled_output_from, (1.0 - conditioning_to_strength))
elif pooled_output_from is not None:
t_to["pooled_output"] = pooled_output_from
n = [tw, t_to]
out.append(n)
return (out, )
class ConditioningConcat:
@classmethod
def INPUT_TYPES(s):
return {"required": {
"conditioning_to": ("CONDITIONING",),
"conditioning_from": ("CONDITIONING",),
}}
RETURN_TYPES = ("CONDITIONING",)
FUNCTION = "concat"
CATEGORY = "conditioning"
def concat(self, conditioning_to, conditioning_from):
out = []
if len(conditioning_from) > 1:
logging.warning("Warning: ConditioningConcat conditioning_from contains more than 1 cond, only the first one will actually be applied to conditioning_to.")
cond_from = conditioning_from[0][0]
for i in range(len(conditioning_to)):
t1 = conditioning_to[i][0]
tw = torch.cat((t1, cond_from),1)
n = [tw, conditioning_to[i][1].copy()]
out.append(n)
return (out, )
class ConditioningSetArea:
@classmethod
def INPUT_TYPES(s):
return {"required": {"conditioning": ("CONDITIONING", ),
"width": ("INT", {"default": 64, "min": 64, "max": MAX_RESOLUTION, "step": 8}),
"height": ("INT", {"default": 64, "min": 64, "max": MAX_RESOLUTION, "step": 8}),
"x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
"y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
}}
RETURN_TYPES = ("CONDITIONING",)
FUNCTION = "append"
CATEGORY = "conditioning"
def append(self, conditioning, width, height, x, y, strength):
c = node_helpers.conditioning_set_values(conditioning, {"area": (height // 8, width // 8, y // 8, x // 8),
"strength": strength,
"set_area_to_bounds": False})
return (c, )
class ConditioningSetAreaPercentage:
@classmethod
def INPUT_TYPES(s):
return {"required": {"conditioning": ("CONDITIONING", ),
"width": ("FLOAT", {"default": 1.0, "min": 0, "max": 1.0, "step": 0.01}),
"height": ("FLOAT", {"default": 1.0, "min": 0, "max": 1.0, "step": 0.01}),
"x": ("FLOAT", {"default": 0, "min": 0, "max": 1.0, "step": 0.01}),
"y": ("FLOAT", {"default": 0, "min": 0, "max": 1.0, "step": 0.01}),
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
}}
RETURN_TYPES = ("CONDITIONING",)
FUNCTION = "append"
CATEGORY = "conditioning"
def append(self, conditioning, width, height, x, y, strength):
c = node_helpers.conditioning_set_values(conditioning, {"area": ("percentage", height, width, y, x),
"strength": strength,
"set_area_to_bounds": False})
return (c, )
class ConditioningSetAreaStrength:
@classmethod
def INPUT_TYPES(s):
return {"required": {"conditioning": ("CONDITIONING", ),
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
}}
RETURN_TYPES = ("CONDITIONING",)
FUNCTION = "append"
CATEGORY = "conditioning"
def append(self, conditioning, strength):
c = node_helpers.conditioning_set_values(conditioning, {"strength": strength})
return (c, )
class ConditioningSetMask:
@classmethod
def INPUT_TYPES(s):
return {"required": {"conditioning": ("CONDITIONING", ),
"mask": ("MASK", ),
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
"set_cond_area": (["default", "mask bounds"],),
}}
RETURN_TYPES = ("CONDITIONING",)
FUNCTION = "append"
CATEGORY = "conditioning"
def append(self, conditioning, mask, set_cond_area, strength):
set_area_to_bounds = False
if set_cond_area != "default":
set_area_to_bounds = True
if len(mask.shape) < 3:
mask = mask.unsqueeze(0)
c = node_helpers.conditioning_set_values(conditioning, {"mask": mask,
"set_area_to_bounds": set_area_to_bounds,
"mask_strength": strength})
return (c, )
class ConditioningZeroOut:
@classmethod
def INPUT_TYPES(s):
return {"required": {"conditioning": ("CONDITIONING", )}}
RETURN_TYPES = ("CONDITIONING",)
FUNCTION = "zero_out"
CATEGORY = "advanced/conditioning"
def zero_out(self, conditioning):
c = []
for t in conditioning:
d = t[1].copy()
pooled_output = d.get("pooled_output", None)
if pooled_output is not None:
d["pooled_output"] = torch.zeros_like(pooled_output)
conditioning_lyrics = d.get("conditioning_lyrics", None)
if conditioning_lyrics is not None:
d["conditioning_lyrics"] = torch.zeros_like(conditioning_lyrics)
n = [torch.zeros_like(t[0]), d]
c.append(n)
return (c, )
class ConditioningSetTimestepRange:
@classmethod
def INPUT_TYPES(s):
return {"required": {"conditioning": ("CONDITIONING", ),
"start": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001}),
"end": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001})
}}
RETURN_TYPES = ("CONDITIONING",)
FUNCTION = "set_range"
CATEGORY = "advanced/conditioning"
def set_range(self, conditioning, start, end):
c = node_helpers.conditioning_set_values(conditioning, {"start_percent": start,
"end_percent": end})
return (c, )
class VAEDecode:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"samples": ("LATENT", {"tooltip": "The latent to be decoded."}),
"vae": ("VAE", {"tooltip": "The VAE model used for decoding the latent."})
}
}
RETURN_TYPES = ("IMAGE",)
OUTPUT_TOOLTIPS = ("The decoded image.",)
FUNCTION = "decode"
CATEGORY = "latent"
DESCRIPTION = "Decodes latent images back into pixel space images."
def decode(self, vae, samples):
images = vae.decode(samples["samples"])
if len(images.shape) == 5: #Combine batches
images = images.reshape(-1, images.shape[-3], images.shape[-2], images.shape[-1])
return (images, )
class VAEDecodeTiled:
@classmethod
def INPUT_TYPES(s):
return {"required": {"samples": ("LATENT", ), "vae": ("VAE", ),
"tile_size": ("INT", {"default": 512, "min": 64, "max": 4096, "step": 32}),
"overlap": ("INT", {"default": 64, "min": 0, "max": 4096, "step": 32}),
"temporal_size": ("INT", {"default": 64, "min": 8, "max": 4096, "step": 4, "tooltip": "Only used for video VAEs: Amount of frames to decode at a time."}),
"temporal_overlap": ("INT", {"default": 8, "min": 4, "max": 4096, "step": 4, "tooltip": "Only used for video VAEs: Amount of frames to overlap."}),
}}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "decode"
CATEGORY = "_for_testing"
def decode(self, vae, samples, tile_size, overlap=64, temporal_size=64, temporal_overlap=8):
if tile_size < overlap * 4:
overlap = tile_size // 4
if temporal_size < temporal_overlap * 2:
temporal_overlap = temporal_overlap // 2
temporal_compression = vae.temporal_compression_decode()
if temporal_compression is not None:
temporal_size = max(2, temporal_size // temporal_compression)
temporal_overlap = max(1, min(temporal_size // 2, temporal_overlap // temporal_compression))
else:
temporal_size = None
temporal_overlap = None
compression = vae.spacial_compression_decode()
images = vae.decode_tiled(samples["samples"], tile_x=tile_size // compression, tile_y=tile_size // compression, overlap=overlap // compression, tile_t=temporal_size, overlap_t=temporal_overlap)
if len(images.shape) == 5: #Combine batches
images = images.reshape(-1, images.shape[-3], images.shape[-2], images.shape[-1])
return (images, )
class VAEEncode:
@classmethod
def INPUT_TYPES(s):
return {"required": { "pixels": ("IMAGE", ), "vae": ("VAE", )}}
RETURN_TYPES = ("LATENT",)
FUNCTION = "encode"
CATEGORY = "latent"
def encode(self, vae, pixels):
t = vae.encode(pixels[:,:,:,:3])
return ({"samples":t}, )
class VAEEncodeTiled:
@classmethod
def INPUT_TYPES(s):
return {"required": {"pixels": ("IMAGE", ), "vae": ("VAE", ),
"tile_size": ("INT", {"default": 512, "min": 64, "max": 4096, "step": 64}),
"overlap": ("INT", {"default": 64, "min": 0, "max": 4096, "step": 32}),
"temporal_size": ("INT", {"default": 64, "min": 8, "max": 4096, "step": 4, "tooltip": "Only used for video VAEs: Amount of frames to encode at a time."}),
"temporal_overlap": ("INT", {"default": 8, "min": 4, "max": 4096, "step": 4, "tooltip": "Only used for video VAEs: Amount of frames to overlap."}),
}}
RETURN_TYPES = ("LATENT",)
FUNCTION = "encode"
CATEGORY = "_for_testing"
def encode(self, vae, pixels, tile_size, overlap, temporal_size=64, temporal_overlap=8):
t = vae.encode_tiled(pixels[:,:,:,:3], tile_x=tile_size, tile_y=tile_size, overlap=overlap, tile_t=temporal_size, overlap_t=temporal_overlap)
return ({"samples": t}, )
class VAEEncodeForInpaint:
@classmethod
def INPUT_TYPES(s):
return {"required": { "pixels": ("IMAGE", ), "vae": ("VAE", ), "mask": ("MASK", ), "grow_mask_by": ("INT", {"default": 6, "min": 0, "max": 64, "step": 1}),}}
RETURN_TYPES = ("LATENT",)
FUNCTION = "encode"
CATEGORY = "latent/inpaint"
def encode(self, vae, pixels, mask, grow_mask_by=6):
x = (pixels.shape[1] // vae.downscale_ratio) * vae.downscale_ratio
y = (pixels.shape[2] // vae.downscale_ratio) * vae.downscale_ratio
mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(pixels.shape[1], pixels.shape[2]), mode="bilinear")
pixels = pixels.clone()
if pixels.shape[1] != x or pixels.shape[2] != y:
x_offset = (pixels.shape[1] % vae.downscale_ratio) // 2
y_offset = (pixels.shape[2] % vae.downscale_ratio) // 2
pixels = pixels[:,x_offset:x + x_offset, y_offset:y + y_offset,:]
mask = mask[:,:,x_offset:x + x_offset, y_offset:y + y_offset]
#grow mask by a few pixels to keep things seamless in latent space
if grow_mask_by == 0:
mask_erosion = mask
else:
kernel_tensor = torch.ones((1, 1, grow_mask_by, grow_mask_by))
padding = math.ceil((grow_mask_by - 1) / 2)
mask_erosion = torch.clamp(torch.nn.functional.conv2d(mask.round(), kernel_tensor, padding=padding), 0, 1)
m = (1.0 - mask.round()).squeeze(1)
for i in range(3):
pixels[:,:,:,i] -= 0.5
pixels[:,:,:,i] *= m
pixels[:,:,:,i] += 0.5
t = vae.encode(pixels)
return ({"samples":t, "noise_mask": (mask_erosion[:,:,:x,:y].round())}, )
class InpaintModelConditioning:
@classmethod
def INPUT_TYPES(s):
return {"required": {"positive": ("CONDITIONING", ),
"negative": ("CONDITIONING", ),
"vae": ("VAE", ),
"pixels": ("IMAGE", ),
"mask": ("MASK", ),
"noise_mask": ("BOOLEAN", {"default": True, "tooltip": "Add a noise mask to the latent so sampling will only happen within the mask. Might improve results or completely break things depending on the model."}),
}}
RETURN_TYPES = ("CONDITIONING","CONDITIONING","LATENT")
RETURN_NAMES = ("positive", "negative", "latent")
FUNCTION = "encode"
CATEGORY = "conditioning/inpaint"
def encode(self, positive, negative, pixels, vae, mask, noise_mask=True):
x = (pixels.shape[1] // 8) * 8
y = (pixels.shape[2] // 8) * 8
mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(pixels.shape[1], pixels.shape[2]), mode="bilinear")
orig_pixels = pixels
pixels = orig_pixels.clone()
if pixels.shape[1] != x or pixels.shape[2] != y:
x_offset = (pixels.shape[1] % 8) // 2
y_offset = (pixels.shape[2] % 8) // 2
pixels = pixels[:,x_offset:x + x_offset, y_offset:y + y_offset,:]
mask = mask[:,:,x_offset:x + x_offset, y_offset:y + y_offset]
m = (1.0 - mask.round()).squeeze(1)
for i in range(3):
pixels[:,:,:,i] -= 0.5
pixels[:,:,:,i] *= m
pixels[:,:,:,i] += 0.5
concat_latent = vae.encode(pixels)
orig_latent = vae.encode(orig_pixels)
out_latent = {}
out_latent["samples"] = orig_latent
if noise_mask:
out_latent["noise_mask"] = mask
out = []
for conditioning in [positive, negative]:
c = node_helpers.conditioning_set_values(conditioning, {"concat_latent_image": concat_latent,
"concat_mask": mask})
out.append(c)
return (out[0], out[1], out_latent)
class SaveLatent:
def __init__(self):
self.output_dir = folder_paths.get_output_directory()
@classmethod
def INPUT_TYPES(s):
return {"required": { "samples": ("LATENT", ),
"filename_prefix": ("STRING", {"default": "latents/ComfyUI"})},
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"},
}
RETURN_TYPES = ()
FUNCTION = "save"
OUTPUT_NODE = True
CATEGORY = "_for_testing"
def save(self, samples, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None):
full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir)
# support save metadata for latent sharing
prompt_info = ""
if prompt is not None:
prompt_info = json.dumps(prompt)
metadata = None
if not args.disable_metadata:
metadata = {"prompt": prompt_info}
if extra_pnginfo is not None:
for x in extra_pnginfo:
metadata[x] = json.dumps(extra_pnginfo[x])
file = f"{filename}_{counter:05}_.latent"
results: list[FileLocator] = []
results.append({
"filename": file,
"subfolder": subfolder,
"type": "output"
})
file = os.path.join(full_output_folder, file)
output = {}
output["latent_tensor"] = samples["samples"].contiguous()
output["latent_format_version_0"] = torch.tensor([])
comfy.utils.save_torch_file(output, file, metadata=metadata)
return { "ui": { "latents": results } }
class LoadLatent:
@classmethod
def INPUT_TYPES(s):
input_dir = folder_paths.get_input_directory()
files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f)) and f.endswith(".latent")]
return {"required": {"latent": [sorted(files), ]}, }
CATEGORY = "_for_testing"
RETURN_TYPES = ("LATENT", )
FUNCTION = "load"
def load(self, latent):
latent_path = folder_paths.get_annotated_filepath(latent)
latent = safetensors.torch.load_file(latent_path, device="cpu")
multiplier = 1.0
if "latent_format_version_0" not in latent:
multiplier = 1.0 / 0.18215
samples = {"samples": latent["latent_tensor"].float() * multiplier}
return (samples, )
@classmethod
def IS_CHANGED(s, latent):
image_path = folder_paths.get_annotated_filepath(latent)
m = hashlib.sha256()
with open(image_path, 'rb') as f:
m.update(f.read())
return m.digest().hex()
@classmethod
def VALIDATE_INPUTS(s, latent):
if not folder_paths.exists_annotated_filepath(latent):
return "Invalid latent file: {}".format(latent)
return True
class CheckpointLoader:
@classmethod
def INPUT_TYPES(s):
return {"required": { "config_name": (folder_paths.get_filename_list("configs"), ),
"ckpt_name": (folder_paths.get_filename_list("checkpoints"), )}}
RETURN_TYPES = ("MODEL", "CLIP", "VAE")
FUNCTION = "load_checkpoint"
CATEGORY = "advanced/loaders"
DEPRECATED = True
def load_checkpoint(self, config_name, ckpt_name):
config_path = folder_paths.get_full_path("configs", config_name)
ckpt_path = folder_paths.get_full_path_or_raise("checkpoints", ckpt_name)
return comfy.sd.load_checkpoint(config_path, ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings"))
class CheckpointLoaderSimple:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"ckpt_name": (folder_paths.get_filename_list("checkpoints"), {"tooltip": "The name of the checkpoint (model) to load."}),
}
}
RETURN_TYPES = ("MODEL", "CLIP", "VAE")
OUTPUT_TOOLTIPS = ("The model used for denoising latents.",
"The CLIP model used for encoding text prompts.",
"The VAE model used for encoding and decoding images to and from latent space.")
FUNCTION = "load_checkpoint"
CATEGORY = "loaders"
DESCRIPTION = "Loads a diffusion model checkpoint, diffusion models are used to denoise latents."
def load_checkpoint(self, ckpt_name):
ckpt_path = folder_paths.get_full_path_or_raise("checkpoints", ckpt_name)
out = comfy.sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings"))
return out[:3]
class DiffusersLoader:
@classmethod
def INPUT_TYPES(cls):
paths = []
for search_path in folder_paths.get_folder_paths("diffusers"):
if os.path.exists(search_path):
for root, subdir, files in os.walk(search_path, followlinks=True):
if "model_index.json" in files:
paths.append(os.path.relpath(root, start=search_path))
return {"required": {"model_path": (paths,), }}
RETURN_TYPES = ("MODEL", "CLIP", "VAE")
FUNCTION = "load_checkpoint"
CATEGORY = "advanced/loaders/deprecated"
def load_checkpoint(self, model_path, output_vae=True, output_clip=True):
for search_path in folder_paths.get_folder_paths("diffusers"):
if os.path.exists(search_path):
path = os.path.join(search_path, model_path)
if os.path.exists(path):
model_path = path
break
return comfy.diffusers_load.load_diffusers(model_path, output_vae=output_vae, output_clip=output_clip, embedding_directory=folder_paths.get_folder_paths("embeddings"))
class unCLIPCheckpointLoader:
@classmethod
def INPUT_TYPES(s):
return {"required": { "ckpt_name": (folder_paths.get_filename_list("checkpoints"), ),
}}
RETURN_TYPES = ("MODEL", "CLIP", "VAE", "CLIP_VISION")
FUNCTION = "load_checkpoint"
CATEGORY = "loaders"
def load_checkpoint(self, ckpt_name, output_vae=True, output_clip=True):
ckpt_path = folder_paths.get_full_path_or_raise("checkpoints", ckpt_name)
out = comfy.sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, output_clipvision=True, embedding_directory=folder_paths.get_folder_paths("embeddings"))
return out
class CLIPSetLastLayer:
@classmethod
def INPUT_TYPES(s):
return {"required": { "clip": ("CLIP", ),
"stop_at_clip_layer": ("INT", {"default": -1, "min": -24, "max": -1, "step": 1}),
}}
RETURN_TYPES = ("CLIP",)
FUNCTION = "set_last_layer"
CATEGORY = "conditioning"
def set_last_layer(self, clip, stop_at_clip_layer):
clip = clip.clone()
clip.clip_layer(stop_at_clip_layer)
return (clip,)
class LoraLoader:
def __init__(self):
self.loaded_lora = None
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"model": ("MODEL", {"tooltip": "The diffusion model the LoRA will be applied to."}),
"clip": ("CLIP", {"tooltip": "The CLIP model the LoRA will be applied to."}),
"lora_name": (folder_paths.get_filename_list("loras"), {"tooltip": "The name of the LoRA."}),
"strength_model": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01, "tooltip": "How strongly to modify the diffusion model. This value can be negative."}),
"strength_clip": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01, "tooltip": "How strongly to modify the CLIP model. This value can be negative."}),
}
}
RETURN_TYPES = ("MODEL", "CLIP")
OUTPUT_TOOLTIPS = ("The modified diffusion model.", "The modified CLIP model.")
FUNCTION = "load_lora"
CATEGORY = "loaders"
DESCRIPTION = "LoRAs are used to modify diffusion and CLIP models, altering the way in which latents are denoised such as applying styles. Multiple LoRA nodes can be linked together."
def load_lora(self, model, clip, lora_name, strength_model, strength_clip):
if strength_model == 0 and strength_clip == 0:
return (model, clip)
lora_path = folder_paths.get_full_path_or_raise("loras", lora_name)
lora = None
if self.loaded_lora is not None:
if self.loaded_lora[0] == lora_path:
lora = self.loaded_lora[1]
else:
self.loaded_lora = None
if lora is None:
lora = comfy.utils.load_torch_file(lora_path, safe_load=True)
self.loaded_lora = (lora_path, lora)
model_lora, clip_lora = comfy.sd.load_lora_for_models(model, clip, lora, strength_model, strength_clip)
return (model_lora, clip_lora)
class LoraLoaderModelOnly(LoraLoader):
@classmethod
def INPUT_TYPES(s):
return {"required": { "model": ("MODEL",),
"lora_name": (folder_paths.get_filename_list("loras"), ),
"strength_model": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01}),
}}
RETURN_TYPES = ("MODEL",)
FUNCTION = "load_lora_model_only"
def load_lora_model_only(self, model, lora_name, strength_model):
return (self.load_lora(model, None, lora_name, strength_model, 0)[0],)
class VAELoader:
@staticmethod
def vae_list():
vaes = folder_paths.get_filename_list("vae")
approx_vaes = folder_paths.get_filename_list("vae_approx")
sdxl_taesd_enc = False
sdxl_taesd_dec = False
sd1_taesd_enc = False
sd1_taesd_dec = False
sd3_taesd_enc = False
sd3_taesd_dec = False
f1_taesd_enc = False
f1_taesd_dec = False
for v in approx_vaes:
if v.startswith("taesd_decoder."):
sd1_taesd_dec = True
elif v.startswith("taesd_encoder."):
sd1_taesd_enc = True
elif v.startswith("taesdxl_decoder."):
sdxl_taesd_dec = True
elif v.startswith("taesdxl_encoder."):
sdxl_taesd_enc = True
elif v.startswith("taesd3_decoder."):
sd3_taesd_dec = True
elif v.startswith("taesd3_encoder."):
sd3_taesd_enc = True
elif v.startswith("taef1_encoder."):
f1_taesd_dec = True
elif v.startswith("taef1_decoder."):
f1_taesd_enc = True
if sd1_taesd_dec and sd1_taesd_enc:
vaes.append("taesd")
if sdxl_taesd_dec and sdxl_taesd_enc:
vaes.append("taesdxl")
if sd3_taesd_dec and sd3_taesd_enc:
vaes.append("taesd3")
if f1_taesd_dec and f1_taesd_enc:
vaes.append("taef1")
return vaes
@staticmethod
def load_taesd(name):
sd = {}
approx_vaes = folder_paths.get_filename_list("vae_approx")
encoder = next(filter(lambda a: a.startswith("{}_encoder.".format(name)), approx_vaes))
decoder = next(filter(lambda a: a.startswith("{}_decoder.".format(name)), approx_vaes))
enc = comfy.utils.load_torch_file(folder_paths.get_full_path_or_raise("vae_approx", encoder))
for k in enc:
sd["taesd_encoder.{}".format(k)] = enc[k]
dec = comfy.utils.load_torch_file(folder_paths.get_full_path_or_raise("vae_approx", decoder))
for k in dec:
sd["taesd_decoder.{}".format(k)] = dec[k]
if name == "taesd":
sd["vae_scale"] = torch.tensor(0.18215)
sd["vae_shift"] = torch.tensor(0.0)
elif name == "taesdxl":
sd["vae_scale"] = torch.tensor(0.13025)
sd["vae_shift"] = torch.tensor(0.0)
elif name == "taesd3":
sd["vae_scale"] = torch.tensor(1.5305)
sd["vae_shift"] = torch.tensor(0.0609)
elif name == "taef1":
sd["vae_scale"] = torch.tensor(0.3611)
sd["vae_shift"] = torch.tensor(0.1159)
return sd
@classmethod
def INPUT_TYPES(s):
return {"required": { "vae_name": (s.vae_list(), )}}
RETURN_TYPES = ("VAE",)
FUNCTION = "load_vae"
CATEGORY = "loaders"
#TODO: scale factor?
def load_vae(self, vae_name):
if vae_name in ["taesd", "taesdxl", "taesd3", "taef1"]:
sd = self.load_taesd(vae_name)
else:
vae_path = folder_paths.get_full_path_or_raise("vae", vae_name)
sd = comfy.utils.load_torch_file(vae_path)
vae = comfy.sd.VAE(sd=sd)
vae.throw_exception_if_invalid()
return (vae,)
class ControlNetLoader:
@classmethod
def INPUT_TYPES(s):
return {"required": { "control_net_name": (folder_paths.get_filename_list("controlnet"), )}}
RETURN_TYPES = ("CONTROL_NET",)
FUNCTION = "load_controlnet"
CATEGORY = "loaders"
def load_controlnet(self, control_net_name):
controlnet_path = folder_paths.get_full_path_or_raise("controlnet", control_net_name)
controlnet = comfy.controlnet.load_controlnet(controlnet_path)
if controlnet is None:
raise RuntimeError("ERROR: controlnet file is invalid and does not contain a valid controlnet model.")
return (controlnet,)
class DiffControlNetLoader:
@classmethod
def INPUT_TYPES(s):
return {"required": { "model": ("MODEL",),
"control_net_name": (folder_paths.get_filename_list("controlnet"), )}}
RETURN_TYPES = ("CONTROL_NET",)
FUNCTION = "load_controlnet"
CATEGORY = "loaders"
def load_controlnet(self, model, control_net_name):
controlnet_path = folder_paths.get_full_path_or_raise("controlnet", control_net_name)
controlnet = comfy.controlnet.load_controlnet(controlnet_path, model)
return (controlnet,)
class ControlNetApply:
@classmethod
def INPUT_TYPES(s):
return {"required": {"conditioning": ("CONDITIONING", ),
"control_net": ("CONTROL_NET", ),
"image": ("IMAGE", ),
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01})
}}
RETURN_TYPES = ("CONDITIONING",)
FUNCTION = "apply_controlnet"
DEPRECATED = True
CATEGORY = "conditioning/controlnet"
def apply_controlnet(self, conditioning, control_net, image, strength):
if strength == 0:
return (conditioning, )
c = []
control_hint = image.movedim(-1,1)
for t in conditioning:
n = [t[0], t[1].copy()]
c_net = control_net.copy().set_cond_hint(control_hint, strength)
if 'control' in t[1]:
c_net.set_previous_controlnet(t[1]['control'])
n[1]['control'] = c_net
n[1]['control_apply_to_uncond'] = True
c.append(n)
return (c, )
class ControlNetApplyAdvanced:
@classmethod
def INPUT_TYPES(s):
return {"required": {"positive": ("CONDITIONING", ),
"negative": ("CONDITIONING", ),
"control_net": ("CONTROL_NET", ),
"image": ("IMAGE", ),
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}),
"start_percent": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001}),
"end_percent": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001})
},
"optional": {"vae": ("VAE", ),
}
}
RETURN_TYPES = ("CONDITIONING","CONDITIONING")
RETURN_NAMES = ("positive", "negative")
FUNCTION = "apply_controlnet"
CATEGORY = "conditioning/controlnet"
def apply_controlnet(self, positive, negative, control_net, image, strength, start_percent, end_percent, vae=None, extra_concat=[]):
if strength == 0:
return (positive, negative)
control_hint = image.movedim(-1,1)
cnets = {}
out = []
for conditioning in [positive, negative]:
c = []
for t in conditioning:
d = t[1].copy()
prev_cnet = d.get('control', None)
if prev_cnet in cnets:
c_net = cnets[prev_cnet]
else:
c_net = control_net.copy().set_cond_hint(control_hint, strength, (start_percent, end_percent), vae=vae, extra_concat=extra_concat)
c_net.set_previous_controlnet(prev_cnet)
cnets[prev_cnet] = c_net
d['control'] = c_net
d['control_apply_to_uncond'] = False
n = [t[0], d]
c.append(n)
out.append(c)
return (out[0], out[1])
class UNETLoader:
@classmethod
def INPUT_TYPES(s):
return {"required": { "unet_name": (folder_paths.get_filename_list("diffusion_models"), ),
"weight_dtype": (["default", "fp8_e4m3fn", "fp8_e4m3fn_fast", "fp8_e5m2"],)
}}
RETURN_TYPES = ("MODEL",)
FUNCTION = "load_unet"
CATEGORY = "advanced/loaders"
def load_unet(self, unet_name, weight_dtype):
model_options = {}
if weight_dtype == "fp8_e4m3fn":
model_options["dtype"] = torch.float8_e4m3fn
elif weight_dtype == "fp8_e4m3fn_fast":
model_options["dtype"] = torch.float8_e4m3fn
model_options["fp8_optimizations"] = True
elif weight_dtype == "fp8_e5m2":
model_options["dtype"] = torch.float8_e5m2
unet_path = folder_paths.get_full_path_or_raise("diffusion_models", unet_name)
model = comfy.sd.load_diffusion_model(unet_path, model_options=model_options)
return (model,)
class CLIPLoader:
@classmethod
def INPUT_TYPES(s):
return {"required": { "clip_name": (folder_paths.get_filename_list("text_encoders"), ),
"type": (["stable_diffusion", "stable_cascade", "sd3", "stable_audio", "mochi", "ltxv", "pixart", "cosmos", "lumina2", "wan", "hidream", "chroma", "ace"], ),
},
"optional": {
"device": (["default", "cpu"], {"advanced": True}),
}}
RETURN_TYPES = ("CLIP",)
FUNCTION = "load_clip"
CATEGORY = "advanced/loaders"
DESCRIPTION = "[Recipes]\n\nstable_diffusion: clip-l\nstable_cascade: clip-g\nsd3: t5 xxl/ clip-g / clip-l\nstable_audio: t5 base\nmochi: t5 xxl\ncosmos: old t5 xxl\nlumina2: gemma 2 2B\nwan: umt5 xxl\n hidream: llama-3.1 (Recommend) or t5"
def load_clip(self, clip_name, type="stable_diffusion", device="default"):
clip_type = getattr(comfy.sd.CLIPType, type.upper(), comfy.sd.CLIPType.STABLE_DIFFUSION)
model_options = {}
if device == "cpu":
model_options["load_device"] = model_options["offload_device"] = torch.device("cpu")
clip_path = folder_paths.get_full_path_or_raise("text_encoders", clip_name)
clip = comfy.sd.load_clip(ckpt_paths=[clip_path], embedding_directory=folder_paths.get_folder_paths("embeddings"), clip_type=clip_type, model_options=model_options)
return (clip,)
class DualCLIPLoader:
@classmethod
def INPUT_TYPES(s):
return {"required": { "clip_name1": (folder_paths.get_filename_list("text_encoders"), ),
"clip_name2": (folder_paths.get_filename_list("text_encoders"), ),
"type": (["sdxl", "sd3", "flux", "hunyuan_video", "hidream"], ),
},
"optional": {
"device": (["default", "cpu"], {"advanced": True}),
}}
RETURN_TYPES = ("CLIP",)
FUNCTION = "load_clip"
CATEGORY = "advanced/loaders"
DESCRIPTION = "[Recipes]\n\nsdxl: clip-l, clip-g\nsd3: clip-l, clip-g / clip-l, t5 / clip-g, t5\nflux: clip-l, t5\nhidream: at least one of t5 or llama, recommended t5 and llama"
def load_clip(self, clip_name1, clip_name2, type, device="default"):
clip_type = getattr(comfy.sd.CLIPType, type.upper(), comfy.sd.CLIPType.STABLE_DIFFUSION)
clip_path1 = folder_paths.get_full_path_or_raise("text_encoders", clip_name1)
clip_path2 = folder_paths.get_full_path_or_raise("text_encoders", clip_name2)
model_options = {}
if device == "cpu":
model_options["load_device"] = model_options["offload_device"] = torch.device("cpu")
clip = comfy.sd.load_clip(ckpt_paths=[clip_path1, clip_path2], embedding_directory=folder_paths.get_folder_paths("embeddings"), clip_type=clip_type, model_options=model_options)
return (clip,)
class CLIPVisionLoader:
@classmethod
def INPUT_TYPES(s):
return {"required": { "clip_name": (folder_paths.get_filename_list("clip_vision"), ),
}}
RETURN_TYPES = ("CLIP_VISION",)
FUNCTION = "load_clip"
CATEGORY = "loaders"
def load_clip(self, clip_name):
clip_path = folder_paths.get_full_path_or_raise("clip_vision", clip_name)
clip_vision = comfy.clip_vision.load(clip_path)
if clip_vision is None:
raise RuntimeError("ERROR: clip vision file is invalid and does not contain a valid vision model.")
return (clip_vision,)
class CLIPVisionEncode:
@classmethod
def INPUT_TYPES(s):
return {"required": { "clip_vision": ("CLIP_VISION",),
"image": ("IMAGE",),
"crop": (["center", "none"],)
}}
RETURN_TYPES = ("CLIP_VISION_OUTPUT",)
FUNCTION = "encode"
CATEGORY = "conditioning"
def encode(self, clip_vision, image, crop):
crop_image = True
if crop != "center":
crop_image = False
output = clip_vision.encode_image(image, crop=crop_image)
return (output,)
class StyleModelLoader:
@classmethod
def INPUT_TYPES(s):
return {"required": { "style_model_name": (folder_paths.get_filename_list("style_models"), )}}
RETURN_TYPES = ("STYLE_MODEL",)
FUNCTION = "load_style_model"
CATEGORY = "loaders"
def load_style_model(self, style_model_name):
style_model_path = folder_paths.get_full_path_or_raise("style_models", style_model_name)
style_model = comfy.sd.load_style_model(style_model_path)
return (style_model,)
class StyleModelApply:
@classmethod
def INPUT_TYPES(s):
return {"required": {"conditioning": ("CONDITIONING", ),
"style_model": ("STYLE_MODEL", ),
"clip_vision_output": ("CLIP_VISION_OUTPUT", ),
"strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.001}),
"strength_type": (["multiply", "attn_bias"], ),
}}
RETURN_TYPES = ("CONDITIONING",)
FUNCTION = "apply_stylemodel"
CATEGORY = "conditioning/style_model"
def apply_stylemodel(self, conditioning, style_model, clip_vision_output, strength, strength_type):
cond = style_model.get_cond(clip_vision_output).flatten(start_dim=0, end_dim=1).unsqueeze(dim=0)
if strength_type == "multiply":
cond *= strength
n = cond.shape[1]
c_out = []
for t in conditioning:
(txt, keys) = t
keys = keys.copy()
# even if the strength is 1.0 (i.e, no change), if there's already a mask, we have to add to it
if "attention_mask" in keys or (strength_type == "attn_bias" and strength != 1.0):
# math.log raises an error if the argument is zero
# torch.log returns -inf, which is what we want
attn_bias = torch.log(torch.Tensor([strength if strength_type == "attn_bias" else 1.0]))
# get the size of the mask image
mask_ref_size = keys.get("attention_mask_img_shape", (1, 1))
n_ref = mask_ref_size[0] * mask_ref_size[1]
n_txt = txt.shape[1]
# grab the existing mask
mask = keys.get("attention_mask", None)
# create a default mask if it doesn't exist
if mask is None:
mask = torch.zeros((txt.shape[0], n_txt + n_ref, n_txt + n_ref), dtype=torch.float16)
# convert the mask dtype, because it might be boolean
# we want it to be interpreted as a bias
if mask.dtype == torch.bool:
# log(True) = log(1) = 0
# log(False) = log(0) = -inf
mask = torch.log(mask.to(dtype=torch.float16))
# now we make the mask bigger to add space for our new tokens
new_mask = torch.zeros((txt.shape[0], n_txt + n + n_ref, n_txt + n + n_ref), dtype=torch.float16)
# copy over the old mask, in quandrants
new_mask[:, :n_txt, :n_txt] = mask[:, :n_txt, :n_txt]
new_mask[:, :n_txt, n_txt+n:] = mask[:, :n_txt, n_txt:]
new_mask[:, n_txt+n:, :n_txt] = mask[:, n_txt:, :n_txt]
new_mask[:, n_txt+n:, n_txt+n:] = mask[:, n_txt:, n_txt:]
# now fill in the attention bias to our redux tokens
new_mask[:, :n_txt, n_txt:n_txt+n] = attn_bias
new_mask[:, n_txt+n:, n_txt:n_txt+n] = attn_bias
keys["attention_mask"] = new_mask.to(txt.device)
keys["attention_mask_img_shape"] = mask_ref_size
c_out.append([torch.cat((txt, cond), dim=1), keys])
return (c_out,)
class unCLIPConditioning:
@classmethod
def INPUT_TYPES(s):
return {"required": {"conditioning": ("CONDITIONING", ),
"clip_vision_output": ("CLIP_VISION_OUTPUT", ),
"strength": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}),
"noise_augmentation": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.01}),
}}
RETURN_TYPES = ("CONDITIONING",)
FUNCTION = "apply_adm"
CATEGORY = "conditioning"
def apply_adm(self, conditioning, clip_vision_output, strength, noise_augmentation):
if strength == 0:
return (conditioning, )
c = node_helpers.conditioning_set_values(conditioning, {"unclip_conditioning": [{"clip_vision_output": clip_vision_output, "strength": strength, "noise_augmentation": noise_augmentation}]}, append=True)
return (c, )
class GLIGENLoader:
@classmethod
def INPUT_TYPES(s):
return {"required": { "gligen_name": (folder_paths.get_filename_list("gligen"), )}}
RETURN_TYPES = ("GLIGEN",)
FUNCTION = "load_gligen"
CATEGORY = "loaders"
def load_gligen(self, gligen_name):
gligen_path = folder_paths.get_full_path_or_raise("gligen", gligen_name)
gligen = comfy.sd.load_gligen(gligen_path)
return (gligen,)
class GLIGENTextBoxApply:
@classmethod
def INPUT_TYPES(s):
return {"required": {"conditioning_to": ("CONDITIONING", ),
"clip": ("CLIP", ),
"gligen_textbox_model": ("GLIGEN", ),
"text": ("STRING", {"multiline": True, "dynamicPrompts": True}),
"width": ("INT", {"default": 64, "min": 8, "max": MAX_RESOLUTION, "step": 8}),
"height": ("INT", {"default": 64, "min": 8, "max": MAX_RESOLUTION, "step": 8}),
"x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
"y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
}}
RETURN_TYPES = ("CONDITIONING",)
FUNCTION = "append"
CATEGORY = "conditioning/gligen"
def append(self, conditioning_to, clip, gligen_textbox_model, text, width, height, x, y):
c = []
cond, cond_pooled = clip.encode_from_tokens(clip.tokenize(text), return_pooled="unprojected")
for t in conditioning_to:
n = [t[0], t[1].copy()]
position_params = [(cond_pooled, height // 8, width // 8, y // 8, x // 8)]
prev = []
if "gligen" in n[1]:
prev = n[1]['gligen'][2]
n[1]['gligen'] = ("position", gligen_textbox_model, prev + position_params)
c.append(n)
return (c, )
class EmptyLatentImage:
def __init__(self):
self.device = comfy.model_management.intermediate_device()
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"width": ("INT", {"default": 512, "min": 16, "max": MAX_RESOLUTION, "step": 8, "tooltip": "The width of the latent images in pixels."}),
"height": ("INT", {"default": 512, "min": 16, "max": MAX_RESOLUTION, "step": 8, "tooltip": "The height of the latent images in pixels."}),
"batch_size": ("INT", {"default": 1, "min": 1, "max": 4096, "tooltip": "The number of latent images in the batch."})
}
}
RETURN_TYPES = ("LATENT",)
OUTPUT_TOOLTIPS = ("The empty latent image batch.",)
FUNCTION = "generate"
CATEGORY = "latent"
DESCRIPTION = "Create a new batch of empty latent images to be denoised via sampling."
def generate(self, width, height, batch_size=1):
latent = torch.zeros([batch_size, 4, height // 8, width // 8], device=self.device)
return ({"samples":latent}, )
class LatentFromBatch:
@classmethod
def INPUT_TYPES(s):
return {"required": { "samples": ("LATENT",),
"batch_index": ("INT", {"default": 0, "min": 0, "max": 63}),
"length": ("INT", {"default": 1, "min": 1, "max": 64}),
}}
RETURN_TYPES = ("LATENT",)
FUNCTION = "frombatch"
CATEGORY = "latent/batch"
def frombatch(self, samples, batch_index, length):
s = samples.copy()
s_in = samples["samples"]
batch_index = min(s_in.shape[0] - 1, batch_index)
length = min(s_in.shape[0] - batch_index, length)
s["samples"] = s_in[batch_index:batch_index + length].clone()
if "noise_mask" in samples:
masks = samples["noise_mask"]
if masks.shape[0] == 1:
s["noise_mask"] = masks.clone()
else:
if masks.shape[0] < s_in.shape[0]:
masks = masks.repeat(math.ceil(s_in.shape[0] / masks.shape[0]), 1, 1, 1)[:s_in.shape[0]]
s["noise_mask"] = masks[batch_index:batch_index + length].clone()
if "batch_index" not in s:
s["batch_index"] = [x for x in range(batch_index, batch_index+length)]
else:
s["batch_index"] = samples["batch_index"][batch_index:batch_index + length]
return (s,)
class RepeatLatentBatch:
@classmethod
def INPUT_TYPES(s):
return {"required": { "samples": ("LATENT",),
"amount": ("INT", {"default": 1, "min": 1, "max": 64}),
}}
RETURN_TYPES = ("LATENT",)
FUNCTION = "repeat"
CATEGORY = "latent/batch"
def repeat(self, samples, amount):
s = samples.copy()
s_in = samples["samples"]
s["samples"] = s_in.repeat((amount, 1,1,1))
if "noise_mask" in samples and samples["noise_mask"].shape[0] > 1:
masks = samples["noise_mask"]
if masks.shape[0] < s_in.shape[0]:
masks = masks.repeat(math.ceil(s_in.shape[0] / masks.shape[0]), 1, 1, 1)[:s_in.shape[0]]
s["noise_mask"] = samples["noise_mask"].repeat((amount, 1,1,1))
if "batch_index" in s:
offset = max(s["batch_index"]) - min(s["batch_index"]) + 1
s["batch_index"] = s["batch_index"] + [x + (i * offset) for i in range(1, amount) for x in s["batch_index"]]
return (s,)
class LatentUpscale:
upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "bislerp"]
crop_methods = ["disabled", "center"]
@classmethod
def INPUT_TYPES(s):
return {"required": { "samples": ("LATENT",), "upscale_method": (s.upscale_methods,),
"width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
"height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
"crop": (s.crop_methods,)}}
RETURN_TYPES = ("LATENT",)
FUNCTION = "upscale"
CATEGORY = "latent"
def upscale(self, samples, upscale_method, width, height, crop):
if width == 0 and height == 0:
s = samples
else:
s = samples.copy()
if width == 0:
height = max(64, height)
width = max(64, round(samples["samples"].shape[-1] * height / samples["samples"].shape[-2]))
elif height == 0:
width = max(64, width)
height = max(64, round(samples["samples"].shape[-2] * width / samples["samples"].shape[-1]))
else:
width = max(64, width)
height = max(64, height)
s["samples"] = comfy.utils.common_upscale(samples["samples"], width // 8, height // 8, upscale_method, crop)
return (s,)
class LatentUpscaleBy:
upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "bislerp"]
@classmethod
def INPUT_TYPES(s):
return {"required": { "samples": ("LATENT",), "upscale_method": (s.upscale_methods,),
"scale_by": ("FLOAT", {"default": 1.5, "min": 0.01, "max": 8.0, "step": 0.01}),}}
RETURN_TYPES = ("LATENT",)
FUNCTION = "upscale"
CATEGORY = "latent"
def upscale(self, samples, upscale_method, scale_by):
s = samples.copy()
width = round(samples["samples"].shape[-1] * scale_by)
height = round(samples["samples"].shape[-2] * scale_by)
s["samples"] = comfy.utils.common_upscale(samples["samples"], width, height, upscale_method, "disabled")
return (s,)
class LatentRotate:
@classmethod
def INPUT_TYPES(s):
return {"required": { "samples": ("LATENT",),
"rotation": (["none", "90 degrees", "180 degrees", "270 degrees"],),
}}
RETURN_TYPES = ("LATENT",)
FUNCTION = "rotate"
CATEGORY = "latent/transform"
def rotate(self, samples, rotation):
s = samples.copy()
rotate_by = 0
if rotation.startswith("90"):
rotate_by = 1
elif rotation.startswith("180"):
rotate_by = 2
elif rotation.startswith("270"):
rotate_by = 3
s["samples"] = torch.rot90(samples["samples"], k=rotate_by, dims=[3, 2])
return (s,)
class LatentFlip:
@classmethod
def INPUT_TYPES(s):
return {"required": { "samples": ("LATENT",),
"flip_method": (["x-axis: vertically", "y-axis: horizontally"],),
}}
RETURN_TYPES = ("LATENT",)
FUNCTION = "flip"
CATEGORY = "latent/transform"
def flip(self, samples, flip_method):
s = samples.copy()
if flip_method.startswith("x"):
s["samples"] = torch.flip(samples["samples"], dims=[2])
elif flip_method.startswith("y"):
s["samples"] = torch.flip(samples["samples"], dims=[3])
return (s,)
class LatentComposite:
@classmethod
def INPUT_TYPES(s):
return {"required": { "samples_to": ("LATENT",),
"samples_from": ("LATENT",),
"x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
"y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
"feather": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
}}
RETURN_TYPES = ("LATENT",)
FUNCTION = "composite"
CATEGORY = "latent"
def composite(self, samples_to, samples_from, x, y, composite_method="normal", feather=0):
x = x // 8
y = y // 8
feather = feather // 8
samples_out = samples_to.copy()
s = samples_to["samples"].clone()
samples_to = samples_to["samples"]
samples_from = samples_from["samples"]
if feather == 0:
s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x]
else:
samples_from = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x]
mask = torch.ones_like(samples_from)
for t in range(feather):
if y != 0:
mask[:,:,t:1+t,:] *= ((1.0/feather) * (t + 1))
if y + samples_from.shape[2] < samples_to.shape[2]:
mask[:,:,mask.shape[2] -1 -t: mask.shape[2]-t,:] *= ((1.0/feather) * (t + 1))
if x != 0:
mask[:,:,:,t:1+t] *= ((1.0/feather) * (t + 1))
if x + samples_from.shape[3] < samples_to.shape[3]:
mask[:,:,:,mask.shape[3]- 1 - t: mask.shape[3]- t] *= ((1.0/feather) * (t + 1))
rev_mask = torch.ones_like(mask) - mask
s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x] * mask + s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] * rev_mask
samples_out["samples"] = s
return (samples_out,)
class LatentBlend:
@classmethod
def INPUT_TYPES(s):
return {"required": {
"samples1": ("LATENT",),
"samples2": ("LATENT",),
"blend_factor": ("FLOAT", {
"default": 0.5,
"min": 0,
"max": 1,
"step": 0.01
}),
}}
RETURN_TYPES = ("LATENT",)
FUNCTION = "blend"
CATEGORY = "_for_testing"
def blend(self, samples1, samples2, blend_factor:float, blend_mode: str="normal"):
samples_out = samples1.copy()
samples1 = samples1["samples"]
samples2 = samples2["samples"]
if samples1.shape != samples2.shape:
samples2.permute(0, 3, 1, 2)
samples2 = comfy.utils.common_upscale(samples2, samples1.shape[3], samples1.shape[2], 'bicubic', crop='center')
samples2.permute(0, 2, 3, 1)
samples_blended = self.blend_mode(samples1, samples2, blend_mode)
samples_blended = samples1 * blend_factor + samples_blended * (1 - blend_factor)
samples_out["samples"] = samples_blended
return (samples_out,)
def blend_mode(self, img1, img2, mode):
if mode == "normal":
return img2
else:
raise ValueError(f"Unsupported blend mode: {mode}")
class LatentCrop:
@classmethod
def INPUT_TYPES(s):
return {"required": { "samples": ("LATENT",),
"width": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 8}),
"height": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 8}),
"x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
"y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
}}
RETURN_TYPES = ("LATENT",)
FUNCTION = "crop"
CATEGORY = "latent/transform"
def crop(self, samples, width, height, x, y):
s = samples.copy()
samples = samples['samples']
x = x // 8
y = y // 8
#enfonce minimum size of 64
if x > (samples.shape[3] - 8):
x = samples.shape[3] - 8
if y > (samples.shape[2] - 8):
y = samples.shape[2] - 8
new_height = height // 8
new_width = width // 8
to_x = new_width + x
to_y = new_height + y
s['samples'] = samples[:,:,y:to_y, x:to_x]
return (s,)
class SetLatentNoiseMask:
@classmethod
def INPUT_TYPES(s):
return {"required": { "samples": ("LATENT",),
"mask": ("MASK",),
}}
RETURN_TYPES = ("LATENT",)
FUNCTION = "set_mask"
CATEGORY = "latent/inpaint"
def set_mask(self, samples, mask):
s = samples.copy()
s["noise_mask"] = mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1]))
return (s,)
def common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent, denoise=1.0, disable_noise=False, start_step=None, last_step=None, force_full_denoise=False):
latent_image = latent["samples"]
latent_image = comfy.sample.fix_empty_latent_channels(model, latent_image)
if disable_noise:
noise = torch.zeros(latent_image.size(), dtype=latent_image.dtype, layout=latent_image.layout, device="cpu")
else:
batch_inds = latent["batch_index"] if "batch_index" in latent else None
noise = comfy.sample.prepare_noise(latent_image, seed, batch_inds)
noise_mask = None
if "noise_mask" in latent:
noise_mask = latent["noise_mask"]
callback = latent_preview.prepare_callback(model, steps)
disable_pbar = not comfy.utils.PROGRESS_BAR_ENABLED
samples = comfy.sample.sample(model, noise, steps, cfg, sampler_name, scheduler, positive, negative, latent_image,
denoise=denoise, disable_noise=disable_noise, start_step=start_step, last_step=last_step,
force_full_denoise=force_full_denoise, noise_mask=noise_mask, callback=callback, disable_pbar=disable_pbar, seed=seed)
out = latent.copy()
out["samples"] = samples
return (out, )
class KSampler:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"model": ("MODEL", {"tooltip": "The model used for denoising the input latent."}),
"seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff, "control_after_generate": True, "tooltip": "The random seed used for creating the noise."}),
"steps": ("INT", {"default": 20, "min": 1, "max": 10000, "tooltip": "The number of steps used in the denoising process."}),
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.1, "round": 0.01, "tooltip": "The Classifier-Free Guidance scale balances creativity and adherence to the prompt. Higher values result in images more closely matching the prompt however too high values will negatively impact quality."}),
"sampler_name": (comfy.samplers.KSampler.SAMPLERS, {"tooltip": "The algorithm used when sampling, this can affect the quality, speed, and style of the generated output."}),
"scheduler": (comfy.samplers.KSampler.SCHEDULERS, {"tooltip": "The scheduler controls how noise is gradually removed to form the image."}),
"positive": ("CONDITIONING", {"tooltip": "The conditioning describing the attributes you want to include in the image."}),
"negative": ("CONDITIONING", {"tooltip": "The conditioning describing the attributes you want to exclude from the image."}),
"latent_image": ("LATENT", {"tooltip": "The latent image to denoise."}),
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01, "tooltip": "The amount of denoising applied, lower values will maintain the structure of the initial image allowing for image to image sampling."}),
}
}
RETURN_TYPES = ("LATENT",)
OUTPUT_TOOLTIPS = ("The denoised latent.",)
FUNCTION = "sample"
CATEGORY = "sampling"
DESCRIPTION = "Uses the provided model, positive and negative conditioning to denoise the latent image."
def sample(self, model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=1.0):
return common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise)
class KSamplerAdvanced:
@classmethod
def INPUT_TYPES(s):
return {"required":
{"model": ("MODEL",),
"add_noise": (["enable", "disable"], ),
"noise_seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff, "control_after_generate": True}),
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.1, "round": 0.01}),
"sampler_name": (comfy.samplers.KSampler.SAMPLERS, ),
"scheduler": (comfy.samplers.KSampler.SCHEDULERS, ),
"positive": ("CONDITIONING", ),
"negative": ("CONDITIONING", ),
"latent_image": ("LATENT", ),
"start_at_step": ("INT", {"default": 0, "min": 0, "max": 10000}),
"end_at_step": ("INT", {"default": 10000, "min": 0, "max": 10000}),
"return_with_leftover_noise": (["disable", "enable"], ),
}
}
RETURN_TYPES = ("LATENT",)
FUNCTION = "sample"
CATEGORY = "sampling"
def sample(self, model, add_noise, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, start_at_step, end_at_step, return_with_leftover_noise, denoise=1.0):
force_full_denoise = True
if return_with_leftover_noise == "enable":
force_full_denoise = False
disable_noise = False
if add_noise == "disable":
disable_noise = True
return common_ksampler(model, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise, disable_noise=disable_noise, start_step=start_at_step, last_step=end_at_step, force_full_denoise=force_full_denoise)
class SaveImage:
def __init__(self):
self.output_dir = folder_paths.get_output_directory()
self.type = "output"
self.prefix_append = ""
self.compress_level = 4
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"images": ("IMAGE", {"tooltip": "The images to save."}),
"filename_prefix": ("STRING", {"default": "ComfyUI", "tooltip": "The prefix for the file to save. This may include formatting information such as %date:yyyy-MM-dd% or %Empty Latent Image.width% to include values from nodes."})
},
"hidden": {
"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"
},
}
RETURN_TYPES = ()
FUNCTION = "save_images"
OUTPUT_NODE = True
CATEGORY = "image"
DESCRIPTION = "Saves the input images to your ComfyUI output directory."
def save_images(self, images, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None):
filename_prefix += self.prefix_append
full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir, images[0].shape[1], images[0].shape[0])
results = list()
for (batch_number, image) in enumerate(images):
i = 255. * image.cpu().numpy()
img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8))
metadata = None
if not args.disable_metadata:
metadata = PngInfo()
if prompt is not None:
metadata.add_text("prompt", json.dumps(prompt))
if extra_pnginfo is not None:
for x in extra_pnginfo:
metadata.add_text(x, json.dumps(extra_pnginfo[x]))
filename_with_batch_num = filename.replace("%batch_num%", str(batch_number))
file = f"{filename_with_batch_num}_{counter:05}_.png"
img.save(os.path.join(full_output_folder, file), pnginfo=metadata, compress_level=self.compress_level)
results.append({
"filename": file,
"subfolder": subfolder,
"type": self.type
})
counter += 1
return { "ui": { "images": results } }
class PreviewImage(SaveImage):
def __init__(self):
self.output_dir = folder_paths.get_temp_directory()
self.type = "temp"
self.prefix_append = "_temp_" + ''.join(random.choice("abcdefghijklmnopqrstupvxyz") for x in range(5))
self.compress_level = 1
@classmethod
def INPUT_TYPES(s):
return {"required":
{"images": ("IMAGE", ), },
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"},
}
class LoadImage:
@classmethod
def INPUT_TYPES(s):
input_dir = folder_paths.get_input_directory()
files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))]
files = folder_paths.filter_files_content_types(files, ["image"])
return {"required":
{"image": (sorted(files), {"image_upload": True})},
}
CATEGORY = "image"
RETURN_TYPES = ("IMAGE", "MASK")
FUNCTION = "load_image"
def load_image(self, image):
image_path = folder_paths.get_annotated_filepath(image)
img = node_helpers.pillow(Image.open, image_path)
output_images = []
output_masks = []
w, h = None, None
excluded_formats = ['MPO']
for i in ImageSequence.Iterator(img):
i = node_helpers.pillow(ImageOps.exif_transpose, i)
if i.mode == 'I':
i = i.point(lambda i: i * (1 / 255))
image = i.convert("RGB")
if len(output_images) == 0:
w = image.size[0]
h = image.size[1]
if image.size[0] != w or image.size[1] != h:
continue
image = np.array(image).astype(np.float32) / 255.0
image = torch.from_numpy(image)[None,]
if 'A' in i.getbands():
mask = np.array(i.getchannel('A')).astype(np.float32) / 255.0
mask = 1. - torch.from_numpy(mask)
elif i.mode == 'P' and 'transparency' in i.info:
mask = np.array(i.convert('RGBA').getchannel('A')).astype(np.float32) / 255.0
mask = 1. - torch.from_numpy(mask)
else:
mask = torch.zeros((64,64), dtype=torch.float32, device="cpu")
output_images.append(image)
output_masks.append(mask.unsqueeze(0))
if len(output_images) > 1 and img.format not in excluded_formats:
output_image = torch.cat(output_images, dim=0)
output_mask = torch.cat(output_masks, dim=0)
else:
output_image = output_images[0]
output_mask = output_masks[0]
return (output_image, output_mask)
@classmethod
def IS_CHANGED(s, image):
image_path = folder_paths.get_annotated_filepath(image)
m = hashlib.sha256()
with open(image_path, 'rb') as f:
m.update(f.read())
return m.digest().hex()
@classmethod
def VALIDATE_INPUTS(s, image):
if not folder_paths.exists_annotated_filepath(image):
return "Invalid image file: {}".format(image)
return True
class LoadImageMask:
_color_channels = ["alpha", "red", "green", "blue"]
@classmethod
def INPUT_TYPES(s):
input_dir = folder_paths.get_input_directory()
files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))]
return {"required":
{"image": (sorted(files), {"image_upload": True}),
"channel": (s._color_channels, ), }
}
CATEGORY = "mask"
RETURN_TYPES = ("MASK",)
FUNCTION = "load_image"
def load_image(self, image, channel):
image_path = folder_paths.get_annotated_filepath(image)
i = node_helpers.pillow(Image.open, image_path)
i = node_helpers.pillow(ImageOps.exif_transpose, i)
if i.getbands() != ("R", "G", "B", "A"):
if i.mode == 'I':
i = i.point(lambda i: i * (1 / 255))
i = i.convert("RGBA")
mask = None
c = channel[0].upper()
if c in i.getbands():
mask = np.array(i.getchannel(c)).astype(np.float32) / 255.0
mask = torch.from_numpy(mask)
if c == 'A':
mask = 1. - mask
else:
mask = torch.zeros((64,64), dtype=torch.float32, device="cpu")
return (mask.unsqueeze(0),)
@classmethod
def IS_CHANGED(s, image, channel):
image_path = folder_paths.get_annotated_filepath(image)
m = hashlib.sha256()
with open(image_path, 'rb') as f:
m.update(f.read())
return m.digest().hex()
@classmethod
def VALIDATE_INPUTS(s, image):
if not folder_paths.exists_annotated_filepath(image):
return "Invalid image file: {}".format(image)
return True
class LoadImageOutput(LoadImage):
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"image": ("COMBO", {
"image_upload": True,
"image_folder": "output",
"remote": {
"route": "/internal/files/output",
"refresh_button": True,
"control_after_refresh": "first",
},
}),
}
}
DESCRIPTION = "Load an image from the output folder. When the refresh button is clicked, the node will update the image list and automatically select the first image, allowing for easy iteration."
EXPERIMENTAL = True
FUNCTION = "load_image"
class ImageScale:
upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "lanczos"]
crop_methods = ["disabled", "center"]
@classmethod
def INPUT_TYPES(s):
return {"required": { "image": ("IMAGE",), "upscale_method": (s.upscale_methods,),
"width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1}),
"height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1}),
"crop": (s.crop_methods,)}}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "upscale"
CATEGORY = "image/upscaling"
def upscale(self, image, upscale_method, width, height, crop):
if width == 0 and height == 0:
s = image
else:
samples = image.movedim(-1,1)
if width == 0:
width = max(1, round(samples.shape[3] * height / samples.shape[2]))
elif height == 0:
height = max(1, round(samples.shape[2] * width / samples.shape[3]))
s = comfy.utils.common_upscale(samples, width, height, upscale_method, crop)
s = s.movedim(1,-1)
return (s,)
class ImageScaleBy:
upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "lanczos"]
@classmethod
def INPUT_TYPES(s):
return {"required": { "image": ("IMAGE",), "upscale_method": (s.upscale_methods,),
"scale_by": ("FLOAT", {"default": 1.0, "min": 0.01, "max": 8.0, "step": 0.01}),}}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "upscale"
CATEGORY = "image/upscaling"
def upscale(self, image, upscale_method, scale_by):
samples = image.movedim(-1,1)
width = round(samples.shape[3] * scale_by)
height = round(samples.shape[2] * scale_by)
s = comfy.utils.common_upscale(samples, width, height, upscale_method, "disabled")
s = s.movedim(1,-1)
return (s,)
class ImageInvert:
@classmethod
def INPUT_TYPES(s):
return {"required": { "image": ("IMAGE",)}}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "invert"
CATEGORY = "image"
def invert(self, image):
s = 1.0 - image
return (s,)
class ImageBatch:
@classmethod
def INPUT_TYPES(s):
return {"required": { "image1": ("IMAGE",), "image2": ("IMAGE",)}}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "batch"
CATEGORY = "image"
def batch(self, image1, image2):
if image1.shape[1:] != image2.shape[1:]:
image2 = comfy.utils.common_upscale(image2.movedim(-1,1), image1.shape[2], image1.shape[1], "bilinear", "center").movedim(1,-1)
s = torch.cat((image1, image2), dim=0)
return (s,)
class EmptyImage:
def __init__(self, device="cpu"):
self.device = device
@classmethod
def INPUT_TYPES(s):
return {"required": { "width": ("INT", {"default": 512, "min": 1, "max": MAX_RESOLUTION, "step": 1}),
"height": ("INT", {"default": 512, "min": 1, "max": MAX_RESOLUTION, "step": 1}),
"batch_size": ("INT", {"default": 1, "min": 1, "max": 4096}),
"color": ("INT", {"default": 0, "min": 0, "max": 0xFFFFFF, "step": 1, "display": "color"}),
}}
RETURN_TYPES = ("IMAGE",)
FUNCTION = "generate"
CATEGORY = "image"
def generate(self, width, height, batch_size=1, color=0):
r = torch.full([batch_size, height, width, 1], ((color >> 16) & 0xFF) / 0xFF)
g = torch.full([batch_size, height, width, 1], ((color >> 8) & 0xFF) / 0xFF)
b = torch.full([batch_size, height, width, 1], ((color) & 0xFF) / 0xFF)
return (torch.cat((r, g, b), dim=-1), )
class ImagePadForOutpaint:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"image": ("IMAGE",),
"left": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
"top": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
"right": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
"bottom": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}),
"feathering": ("INT", {"default": 40, "min": 0, "max": MAX_RESOLUTION, "step": 1}),
}
}
RETURN_TYPES = ("IMAGE", "MASK")
FUNCTION = "expand_image"
CATEGORY = "image"
def expand_image(self, image, left, top, right, bottom, feathering):
d1, d2, d3, d4 = image.size()
new_image = torch.ones(
(d1, d2 + top + bottom, d3 + left + right, d4),
dtype=torch.float32,
) * 0.5
new_image[:, top:top + d2, left:left + d3, :] = image
mask = torch.ones(
(d2 + top + bottom, d3 + left + right),
dtype=torch.float32,
)
t = torch.zeros(
(d2, d3),
dtype=torch.float32
)
if feathering > 0 and feathering * 2 < d2 and feathering * 2 < d3:
for i in range(d2):
for j in range(d3):
dt = i if top != 0 else d2
db = d2 - i if bottom != 0 else d2
dl = j if left != 0 else d3
dr = d3 - j if right != 0 else d3
d = min(dt, db, dl, dr)
if d >= feathering:
continue
v = (feathering - d) / feathering
t[i, j] = v * v
mask[top:top + d2, left:left + d3] = t
return (new_image, mask.unsqueeze(0))
NODE_CLASS_MAPPINGS = {
"KSampler": KSampler,
"CheckpointLoaderSimple": CheckpointLoaderSimple,
"CLIPTextEncode": CLIPTextEncode,
"CLIPSetLastLayer": CLIPSetLastLayer,
"VAEDecode": VAEDecode,
"VAEEncode": VAEEncode,
"VAEEncodeForInpaint": VAEEncodeForInpaint,
"VAELoader": VAELoader,
"EmptyLatentImage": EmptyLatentImage,
"LatentUpscale": LatentUpscale,
"LatentUpscaleBy": LatentUpscaleBy,
"LatentFromBatch": LatentFromBatch,
"RepeatLatentBatch": RepeatLatentBatch,
"SaveImage": SaveImage,
"PreviewImage": PreviewImage,
"LoadImage": LoadImage,
"LoadImageMask": LoadImageMask,
"LoadImageOutput": LoadImageOutput,
"ImageScale": ImageScale,
"ImageScaleBy": ImageScaleBy,
"ImageInvert": ImageInvert,
"ImageBatch": ImageBatch,
"ImagePadForOutpaint": ImagePadForOutpaint,
"EmptyImage": EmptyImage,
"ConditioningAverage": ConditioningAverage ,
"ConditioningCombine": ConditioningCombine,
"ConditioningConcat": ConditioningConcat,
"ConditioningSetArea": ConditioningSetArea,
"ConditioningSetAreaPercentage": ConditioningSetAreaPercentage,
"ConditioningSetAreaStrength": ConditioningSetAreaStrength,
"ConditioningSetMask": ConditioningSetMask,
"KSamplerAdvanced": KSamplerAdvanced,
"SetLatentNoiseMask": SetLatentNoiseMask,
"LatentComposite": LatentComposite,
"LatentBlend": LatentBlend,
"LatentRotate": LatentRotate,
"LatentFlip": LatentFlip,
"LatentCrop": LatentCrop,
"LoraLoader": LoraLoader,
"CLIPLoader": CLIPLoader,
"UNETLoader": UNETLoader,
"DualCLIPLoader": DualCLIPLoader,
"CLIPVisionEncode": CLIPVisionEncode,
"StyleModelApply": StyleModelApply,
"unCLIPConditioning": unCLIPConditioning,
"ControlNetApply": ControlNetApply,
"ControlNetApplyAdvanced": ControlNetApplyAdvanced,
"ControlNetLoader": ControlNetLoader,
"DiffControlNetLoader": DiffControlNetLoader,
"StyleModelLoader": StyleModelLoader,
"CLIPVisionLoader": CLIPVisionLoader,
"VAEDecodeTiled": VAEDecodeTiled,
"VAEEncodeTiled": VAEEncodeTiled,
"unCLIPCheckpointLoader": unCLIPCheckpointLoader,
"GLIGENLoader": GLIGENLoader,
"GLIGENTextBoxApply": GLIGENTextBoxApply,
"InpaintModelConditioning": InpaintModelConditioning,
"CheckpointLoader": CheckpointLoader,
"DiffusersLoader": DiffusersLoader,
"LoadLatent": LoadLatent,
"SaveLatent": SaveLatent,
"ConditioningZeroOut": ConditioningZeroOut,
"ConditioningSetTimestepRange": ConditioningSetTimestepRange,
"LoraLoaderModelOnly": LoraLoaderModelOnly,
}
NODE_DISPLAY_NAME_MAPPINGS = {
# Sampling
"KSampler": "KSampler",
"KSamplerAdvanced": "KSampler (Advanced)",
# Loaders
"CheckpointLoader": "Load Checkpoint With Config (DEPRECATED)",
"CheckpointLoaderSimple": "Load Checkpoint",
"VAELoader": "Load VAE",
"LoraLoader": "Load LoRA",
"CLIPLoader": "Load CLIP",
"ControlNetLoader": "Load ControlNet Model",
"DiffControlNetLoader": "Load ControlNet Model (diff)",
"StyleModelLoader": "Load Style Model",
"CLIPVisionLoader": "Load CLIP Vision",
"UpscaleModelLoader": "Load Upscale Model",
"UNETLoader": "Load Diffusion Model",
# Conditioning
"CLIPVisionEncode": "CLIP Vision Encode",
"StyleModelApply": "Apply Style Model",
"CLIPTextEncode": "CLIP Text Encode (Prompt)",
"CLIPSetLastLayer": "CLIP Set Last Layer",
"ConditioningCombine": "Conditioning (Combine)",
"ConditioningAverage ": "Conditioning (Average)",
"ConditioningConcat": "Conditioning (Concat)",
"ConditioningSetArea": "Conditioning (Set Area)",
"ConditioningSetAreaPercentage": "Conditioning (Set Area with Percentage)",
"ConditioningSetMask": "Conditioning (Set Mask)",
"ControlNetApply": "Apply ControlNet (OLD)",
"ControlNetApplyAdvanced": "Apply ControlNet",
# Latent
"VAEEncodeForInpaint": "VAE Encode (for Inpainting)",
"SetLatentNoiseMask": "Set Latent Noise Mask",
"VAEDecode": "VAE Decode",
"VAEEncode": "VAE Encode",
"LatentRotate": "Rotate Latent",
"LatentFlip": "Flip Latent",
"LatentCrop": "Crop Latent",
"EmptyLatentImage": "Empty Latent Image",
"LatentUpscale": "Upscale Latent",
"LatentUpscaleBy": "Upscale Latent By",
"LatentComposite": "Latent Composite",
"LatentBlend": "Latent Blend",
"LatentFromBatch" : "Latent From Batch",
"RepeatLatentBatch": "Repeat Latent Batch",
# Image
"SaveImage": "Save Image",
"PreviewImage": "Preview Image",
"LoadImage": "Load Image",
"LoadImageMask": "Load Image (as Mask)",
"LoadImageOutput": "Load Image (from Outputs)",
"ImageScale": "Upscale Image",
"ImageScaleBy": "Upscale Image By",
"ImageUpscaleWithModel": "Upscale Image (using Model)",
"ImageInvert": "Invert Image",
"ImagePadForOutpaint": "Pad Image for Outpainting",
"ImageBatch": "Batch Images",
"ImageCrop": "Image Crop",
"ImageBlend": "Image Blend",
"ImageBlur": "Image Blur",
"ImageQuantize": "Image Quantize",
"ImageSharpen": "Image Sharpen",
"ImageScaleToTotalPixels": "Scale Image to Total Pixels",
# _for_testing
"VAEDecodeTiled": "VAE Decode (Tiled)",
"VAEEncodeTiled": "VAE Encode (Tiled)",
}
EXTENSION_WEB_DIRS = {}
# Dictionary of successfully loaded module names and associated directories.
LOADED_MODULE_DIRS = {}
def get_module_name(module_path: str) -> str:
"""
Returns the module name based on the given module path.
Examples:
get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node.py") -> "my_custom_node"
get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node") -> "my_custom_node"
get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node/") -> "my_custom_node"
get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node/__init__.py") -> "my_custom_node"
get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node/__init__") -> "my_custom_node"
get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node/__init__/") -> "my_custom_node"
get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node.disabled") -> "custom_nodes
Args:
module_path (str): The path of the module.
Returns:
str: The module name.
"""
base_path = os.path.basename(module_path)
if os.path.isfile(module_path):
base_path = os.path.splitext(base_path)[0]
return base_path
def load_custom_node(module_path: str, ignore=set(), module_parent="custom_nodes") -> bool:
module_name = get_module_name(module_path)
if os.path.isfile(module_path):
sp = os.path.splitext(module_path)
module_name = sp[0]
sys_module_name = module_name
elif os.path.isdir(module_path):
sys_module_name = module_path.replace(".", "_x_")
try:
logging.debug("Trying to load custom node {}".format(module_path))
if os.path.isfile(module_path):
module_spec = importlib.util.spec_from_file_location(sys_module_name, module_path)
module_dir = os.path.split(module_path)[0]
else:
module_spec = importlib.util.spec_from_file_location(sys_module_name, os.path.join(module_path, "__init__.py"))
module_dir = module_path
module = importlib.util.module_from_spec(module_spec)
sys.modules[sys_module_name] = module
module_spec.loader.exec_module(module)
LOADED_MODULE_DIRS[module_name] = os.path.abspath(module_dir)
if hasattr(module, "WEB_DIRECTORY") and getattr(module, "WEB_DIRECTORY") is not None:
web_dir = os.path.abspath(os.path.join(module_dir, getattr(module, "WEB_DIRECTORY")))
if os.path.isdir(web_dir):
EXTENSION_WEB_DIRS[module_name] = web_dir
if hasattr(module, "NODE_CLASS_MAPPINGS") and getattr(module, "NODE_CLASS_MAPPINGS") is not None:
for name, node_cls in module.NODE_CLASS_MAPPINGS.items():
if name not in ignore:
NODE_CLASS_MAPPINGS[name] = node_cls
node_cls.RELATIVE_PYTHON_MODULE = "{}.{}".format(module_parent, get_module_name(module_path))
if hasattr(module, "NODE_DISPLAY_NAME_MAPPINGS") and getattr(module, "NODE_DISPLAY_NAME_MAPPINGS") is not None:
NODE_DISPLAY_NAME_MAPPINGS.update(module.NODE_DISPLAY_NAME_MAPPINGS)
return True
else:
logging.warning(f"Skip {module_path} module for custom nodes due to the lack of NODE_CLASS_MAPPINGS.")
return False
except Exception as e:
logging.warning(traceback.format_exc())
logging.warning(f"Cannot import {module_path} module for custom nodes: {e}")
return False
def init_external_custom_nodes():
"""
Initializes the external custom nodes.
This function loads custom nodes from the specified folder paths and imports them into the application.
It measures the import times for each custom node and logs the results.
Returns:
None
"""
base_node_names = set(NODE_CLASS_MAPPINGS.keys())
node_paths = folder_paths.get_folder_paths("custom_nodes")
node_import_times = []
for custom_node_path in node_paths:
possible_modules = os.listdir(os.path.realpath(custom_node_path))
if "__pycache__" in possible_modules:
possible_modules.remove("__pycache__")
for possible_module in possible_modules:
module_path = os.path.join(custom_node_path, possible_module)
if os.path.isfile(module_path) and os.path.splitext(module_path)[1] != ".py": continue
if module_path.endswith(".disabled"): continue
time_before = time.perf_counter()
success = load_custom_node(module_path, base_node_names, module_parent="custom_nodes")
node_import_times.append((time.perf_counter() - time_before, module_path, success))
if len(node_import_times) > 0:
logging.info("\nImport times for custom nodes:")
for n in sorted(node_import_times):
if n[2]:
import_message = ""
else:
import_message = " (IMPORT FAILED)"
logging.info("{:6.1f} seconds{}: {}".format(n[0], import_message, n[1]))
logging.info("")
def init_builtin_extra_nodes():
"""
Initializes the built-in extra nodes in ComfyUI.
This function loads the extra node files located in the "comfy_extras" directory and imports them into ComfyUI.
If any of the extra node files fail to import, a warning message is logged.
Returns:
None
"""
extras_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "comfy_extras")
extras_files = [
"nodes_latent.py",
"nodes_hypernetwork.py",
"nodes_upscale_model.py",
"nodes_post_processing.py",
"nodes_mask.py",
"nodes_compositing.py",
"nodes_rebatch.py",
"nodes_model_merging.py",
"nodes_tomesd.py",
"nodes_clip_sdxl.py",
"nodes_canny.py",
"nodes_freelunch.py",
"nodes_custom_sampler.py",
"nodes_hypertile.py",
"nodes_model_advanced.py",
"nodes_model_downscale.py",
"nodes_images.py",
"nodes_video_model.py",
"nodes_sag.py",
"nodes_perpneg.py",
"nodes_stable3d.py",
"nodes_sdupscale.py",
"nodes_photomaker.py",
"nodes_pixart.py",
"nodes_cond.py",
"nodes_morphology.py",
"nodes_stable_cascade.py",
"nodes_differential_diffusion.py",
"nodes_ip2p.py",
"nodes_model_merging_model_specific.py",
"nodes_pag.py",
"nodes_align_your_steps.py",
"nodes_attention_multiply.py",
"nodes_advanced_samplers.py",
"nodes_webcam.py",
"nodes_audio.py",
"nodes_sd3.py",
"nodes_gits.py",
"nodes_controlnet.py",
"nodes_hunyuan.py",
"nodes_flux.py",
"nodes_lora_extract.py",
"nodes_torch_compile.py",
"nodes_mochi.py",
"nodes_slg.py",
"nodes_mahiro.py",
"nodes_lt.py",
"nodes_hooks.py",
"nodes_load_3d.py",
"nodes_cosmos.py",
"nodes_video.py",
"nodes_lumina2.py",
"nodes_wan.py",
"nodes_lotus.py",
"nodes_hunyuan3d.py",
"nodes_primitive.py",
"nodes_cfg.py",
"nodes_optimalsteps.py",
"nodes_hidream.py",
"nodes_fresca.py",
"nodes_apg.py",
"nodes_preview_any.py",
"nodes_ace.py",
"nodes_string.py",
"nodes_camera_trajectory.py",
]
import_failed = []
for node_file in extras_files:
if not load_custom_node(os.path.join(extras_dir, node_file), module_parent="comfy_extras"):
import_failed.append(node_file)
return import_failed
def init_builtin_api_nodes():
api_nodes_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "comfy_api_nodes")
api_nodes_files = [
"nodes_ideogram.py",
"nodes_openai.py",
"nodes_minimax.py",
"nodes_veo2.py",
"nodes_kling.py",
"nodes_bfl.py",
"nodes_luma.py",
"nodes_recraft.py",
"nodes_pixverse.py",
"nodes_stability.py",
"nodes_pika.py",
"nodes_runway.py",
"nodes_tripo.py",
"nodes_rodin.py",
"nodes_gemini.py",
]
if not load_custom_node(os.path.join(api_nodes_dir, "canary.py"), module_parent="comfy_api_nodes"):
return api_nodes_files
import_failed = []
for node_file in api_nodes_files:
if not load_custom_node(os.path.join(api_nodes_dir, node_file), module_parent="comfy_api_nodes"):
import_failed.append(node_file)
return import_failed
def init_extra_nodes(init_custom_nodes=True, init_api_nodes=True):
import_failed = init_builtin_extra_nodes()
import_failed_api = []
if init_api_nodes:
import_failed_api = init_builtin_api_nodes()
if init_custom_nodes:
init_external_custom_nodes()
else:
logging.info("Skipping loading of custom nodes")
if len(import_failed_api) > 0:
logging.warning("WARNING: some comfy_api_nodes/ nodes did not import correctly. This may be because they are missing some dependencies.\n")
for node in import_failed_api:
logging.warning("IMPORT FAILED: {}".format(node))
logging.warning("\nThis issue might be caused by new missing dependencies added the last time you updated ComfyUI.")
if args.windows_standalone_build:
logging.warning("Please run the update script: update/update_comfyui.bat")
else:
logging.warning("Please do a: pip install -r requirements.txt")
logging.warning("")
if len(import_failed) > 0:
logging.warning("WARNING: some comfy_extras/ nodes did not import correctly. This may be because they are missing some dependencies.\n")
for node in import_failed:
logging.warning("IMPORT FAILED: {}".format(node))
logging.warning("\nThis issue might be caused by new missing dependencies added the last time you updated ComfyUI.")
if args.windows_standalone_build:
logging.warning("Please run the update script: update/update_comfyui.bat")
else:
logging.warning("Please do a: pip install -r requirements.txt")
logging.warning("")
return import_failed