mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2025-01-25 15:55:18 +00:00
9baa48cb33
LoadImage.IS_CHANGED returns the hash of the image so it will execute again if the image changed on the disk.
254 lines
8.7 KiB
Python
254 lines
8.7 KiB
Python
import torch
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import hashlib
|
|
|
|
from PIL import Image
|
|
from PIL.PngImagePlugin import PngInfo
|
|
import numpy as np
|
|
|
|
sys.path.append(os.path.join(sys.path[0], "comfy"))
|
|
|
|
|
|
import comfy.samplers
|
|
import comfy.sd
|
|
|
|
supported_ckpt_extensions = ['.ckpt']
|
|
try:
|
|
import safetensors.torch
|
|
supported_ckpt_extensions += ['.safetensors']
|
|
except:
|
|
print("Could not import safetensors, safetensors support disabled.")
|
|
|
|
def filter_files_extensions(files, extensions):
|
|
return sorted(list(filter(lambda a: os.path.splitext(a)[-1].lower() in extensions, files)))
|
|
|
|
class CLIPTextEncode:
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": {"text": ("STRING", ), "clip": ("CLIP", )}}
|
|
RETURN_TYPES = ("CONDITIONING",)
|
|
FUNCTION = "encode"
|
|
|
|
def encode(self, clip, text):
|
|
return (clip.encode(text), )
|
|
|
|
class VAEDecode:
|
|
def __init__(self, device="cpu"):
|
|
self.device = device
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT", ), "vae": ("VAE", )}}
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "decode"
|
|
|
|
def decode(self, vae, samples):
|
|
return (vae.decode(samples), )
|
|
|
|
class VAEEncode:
|
|
def __init__(self, device="cpu"):
|
|
self.device = device
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "pixels": ("IMAGE", ), "vae": ("VAE", )}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "encode"
|
|
|
|
def encode(self, vae, pixels):
|
|
x = (pixels.shape[1] // 64) * 64
|
|
y = (pixels.shape[2] // 64) * 64
|
|
if pixels.shape[1] != x or pixels.shape[2] != y:
|
|
pixels = pixels[:,:x,:y,:]
|
|
return (vae.encode(pixels), )
|
|
|
|
class CheckpointLoader:
|
|
models_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "models")
|
|
config_dir = os.path.join(models_dir, "configs")
|
|
ckpt_dir = os.path.join(models_dir, "checkpoints")
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "config_name": (filter_files_extensions(os.listdir(s.config_dir), '.yaml'), ),
|
|
"ckpt_name": (filter_files_extensions(os.listdir(s.ckpt_dir), supported_ckpt_extensions), )}}
|
|
RETURN_TYPES = ("MODEL", "CLIP", "VAE")
|
|
FUNCTION = "load_checkpoint"
|
|
|
|
def load_checkpoint(self, config_name, ckpt_name, output_vae=True, output_clip=True):
|
|
config_path = os.path.join(self.config_dir, config_name)
|
|
ckpt_path = os.path.join(self.ckpt_dir, ckpt_name)
|
|
return comfy.sd.load_checkpoint(config_path, ckpt_path, output_vae=True, output_clip=True)
|
|
|
|
class VAELoader:
|
|
models_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "models")
|
|
vae_dir = os.path.join(models_dir, "vae")
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "vae_name": (filter_files_extensions(os.listdir(s.vae_dir), supported_ckpt_extensions), )}}
|
|
RETURN_TYPES = ("VAE",)
|
|
FUNCTION = "load_vae"
|
|
|
|
#TODO: scale factor?
|
|
def load_vae(self, vae_name):
|
|
vae_path = os.path.join(self.vae_dir, vae_name)
|
|
vae = comfy.sd.VAE(ckpt_path=vae_path)
|
|
return (vae,)
|
|
|
|
class EmptyLatentImage:
|
|
def __init__(self, device="cpu"):
|
|
self.device = device
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "width": ("INT", {"default": 512, "min": 64, "max": 4096, "step": 64}),
|
|
"height": ("INT", {"default": 512, "min": 64, "max": 4096, "step": 64}),
|
|
"batch_size": ("INT", {"default": 1, "min": 1, "max": 64})}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "generate"
|
|
|
|
def generate(self, width, height, batch_size=1):
|
|
latent = torch.zeros([batch_size, 4, height // 8, width // 8])
|
|
return (latent, )
|
|
|
|
class LatentUpscale:
|
|
upscale_methods = ["nearest-exact", "bilinear", "area"]
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required": { "samples": ("LATENT",), "upscale_method": (s.upscale_methods,),
|
|
"width": ("INT", {"default": 512, "min": 64, "max": 4096, "step": 64}),
|
|
"height": ("INT", {"default": 512, "min": 64, "max": 4096, "step": 64}),}}
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "upscale"
|
|
|
|
def upscale(self, samples, upscale_method, width, height):
|
|
s = torch.nn.functional.interpolate(samples, size=(height // 8, width // 8), mode=upscale_method)
|
|
return (s,)
|
|
|
|
class KSampler:
|
|
def __init__(self, device="cuda"):
|
|
self.device = device
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required":
|
|
{"model": ("MODEL",),
|
|
"seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff}),
|
|
"steps": ("INT", {"default": 20, "min": 1, "max": 10000}),
|
|
"cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0}),
|
|
"sampler_name": (comfy.samplers.KSampler.SAMPLERS, ),
|
|
"scheduler": (comfy.samplers.KSampler.SCHEDULERS, ),
|
|
"positive": ("CONDITIONING", ),
|
|
"negative": ("CONDITIONING", ),
|
|
"latent_image": ("LATENT", ),
|
|
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}),
|
|
}}
|
|
|
|
RETURN_TYPES = ("LATENT",)
|
|
FUNCTION = "sample"
|
|
|
|
def sample(self, model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=1.0):
|
|
noise = torch.randn(latent_image.size(), dtype=latent_image.dtype, layout=latent_image.layout, generator=torch.manual_seed(seed), device="cpu")
|
|
model = model.to(self.device)
|
|
noise = noise.to(self.device)
|
|
latent_image = latent_image.to(self.device)
|
|
|
|
if positive.shape[0] < noise.shape[0]:
|
|
positive = torch.cat([positive] * noise.shape[0])
|
|
|
|
if negative.shape[0] < noise.shape[0]:
|
|
negative = torch.cat([negative] * noise.shape[0])
|
|
|
|
positive = positive.to(self.device)
|
|
negative = negative.to(self.device)
|
|
|
|
if sampler_name in comfy.samplers.KSampler.SAMPLERS:
|
|
sampler = comfy.samplers.KSampler(model, steps=steps, device=self.device, sampler=sampler_name, scheduler=scheduler, denoise=denoise)
|
|
else:
|
|
#other samplers
|
|
pass
|
|
|
|
samples = sampler.sample(noise, positive, negative, cfg=cfg, latent_image=latent_image)
|
|
samples = samples.cpu()
|
|
model = model.cpu()
|
|
return (samples, )
|
|
|
|
|
|
class SaveImage:
|
|
def __init__(self):
|
|
self.output_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "output")
|
|
try:
|
|
self.counter = int(max(filter(lambda a: 'ComfyUI_' in a, os.listdir(self.output_dir))).split('_')[1]) + 1
|
|
except:
|
|
self.counter = 1
|
|
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required":
|
|
{"images": ("IMAGE", )},
|
|
"hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"},
|
|
}
|
|
|
|
RETURN_TYPES = ()
|
|
FUNCTION = "save_images"
|
|
|
|
OUTPUT_NODE = True
|
|
|
|
def save_images(self, images, prompt=None, extra_pnginfo=None):
|
|
for image in images:
|
|
i = 255. * image.cpu().numpy()
|
|
img = Image.fromarray(i.astype(np.uint8))
|
|
metadata = PngInfo()
|
|
if prompt is not None:
|
|
metadata.add_text("prompt", json.dumps(prompt))
|
|
if extra_pnginfo is not None:
|
|
for x in extra_pnginfo:
|
|
metadata.add_text(x, json.dumps(extra_pnginfo[x]))
|
|
img.save(f"output/ComfyUI_{self.counter:05}_.png", pnginfo=metadata, optimize=True)
|
|
self.counter += 1
|
|
|
|
class LoadImage:
|
|
input_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "input")
|
|
@classmethod
|
|
def INPUT_TYPES(s):
|
|
return {"required":
|
|
{"image": (os.listdir(s.input_dir), )},
|
|
}
|
|
|
|
RETURN_TYPES = ("IMAGE",)
|
|
FUNCTION = "load_image"
|
|
def load_image(self, image):
|
|
image_path = os.path.join(self.input_dir, image)
|
|
image = Image.open(image_path).convert("RGB")
|
|
image = np.array(image).astype(np.float32) / 255.0
|
|
image = torch.from_numpy(image[None])[None,]
|
|
return image
|
|
|
|
@classmethod
|
|
def IS_CHANGED(s, image):
|
|
image_path = os.path.join(s.input_dir, image)
|
|
m = hashlib.sha256()
|
|
with open(image_path, 'rb') as f:
|
|
m.update(f.read())
|
|
return m.digest().hex()
|
|
|
|
|
|
|
|
NODE_CLASS_MAPPINGS = {
|
|
"KSampler": KSampler,
|
|
"CheckpointLoader": CheckpointLoader,
|
|
"CLIPTextEncode": CLIPTextEncode,
|
|
"VAEDecode": VAEDecode,
|
|
"VAEEncode": VAEEncode,
|
|
"VAELoader": VAELoader,
|
|
"EmptyLatentImage": EmptyLatentImage,
|
|
"LatentUpscale": LatentUpscale,
|
|
"SaveImage": SaveImage,
|
|
"LoadImage": LoadImage
|
|
}
|
|
|
|
|