"workflow update"

This commit is contained in:
qiong-yue 2024-09-20 15:15:11 +05:00
parent 38c69080c7
commit 40ec215291
4 changed files with 484 additions and 0 deletions

114
test_prompt/app.py Normal file
View File

@ -0,0 +1,114 @@
#This is an example that uses the websockets api and the SaveImageWebsocket node to get images directly without
#them being saved to disk
import websocket #NOTE: websocket-client (https://github.com/websocket-client/websocket-client)
import uuid
import json
import urllib.request
import urllib.parse
import time
import requests
import random
import sys
import os
# Add the parent directory to sys.path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
# 获取当前脚本文件所在的目录
script_dir = os.path.dirname(os.path.abspath(__file__))
# 构建相对 ComfyUI 目录的路径
comfyui_dir = os.path.abspath(os.path.join(script_dir, '..'))
from ECCV2022_RIFE.InterpolatorInterface import InterpolatorInterface
from generate_prompt import generate_prompt
server_address = "127.0.0.1:8188"
client_id = str(uuid.uuid4())
def queue_prompt(prompt):
p = {"prompt": prompt, "client_id": client_id}
data = json.dumps(p).encode('utf-8')
req = urllib.request.Request("http://{}/prompt".format(server_address), data=data)
return json.loads(urllib.request.urlopen(req).read())
def get_image(filename, subfolder, folder_type):
data = {"filename": filename, "subfolder": subfolder, "type": folder_type}
url_values = urllib.parse.urlencode(data)
with urllib.request.urlopen("http://{}/view?{}".format(server_address, url_values)) as response:
return response.read()
def get_history(prompt_id):
with urllib.request.urlopen("http://{}/history/{}".format(server_address, prompt_id)) as response:
return json.loads(response.read())
def get_images(ws, prompt):
prompt_id = queue_prompt(prompt)['prompt_id']
output_images = {}
current_node = ""
while True:
out = ws.recv()
if isinstance(out, str):
message = json.loads(out)
if message['type'] == 'executing':
data = message['data']
if data['prompt_id'] == prompt_id:
if data['node'] is None:
break #Execution is done
else:
current_node = data['node']
else:
if current_node == 'save_image_websocket_node':
images_output = output_images.get(current_node, [])
images_output.append(out[8:])
output_images[current_node] = images_output
return output_images
# prompt = json.loads(prompt_text)
def main():
with open("workflow_api.json") as workflow_api:
prompt = json.load(workflow_api)
ws = websocket.WebSocket()
ws.connect("ws://{}/ws?clientId={}".format(server_address, client_id))
interpolator = InterpolatorInterface() # Initialize image to video
temp = 300
while(1):
next = temp + 1
# 生prompt
prompt_text = generate_prompt()
# 生圖
prompt["6"]["inputs"]["text"] = prompt_text # 更新 CLIPTextEncode 的文本提示
queue_prompt(prompt)['prompt_id'] # 将提示加入队列并获取 prompt_id
print(prompt_text)
# temp_str = f"output/ComfyUI_00{str(temp)}_.png"
# next_str = f"output/ComfyUI_00{str(next)}_.png"
temp_str = os.path.join(comfyui_dir, f"output/ComfyUI_00{str(temp)}_.png")
next_str = os.path.join(comfyui_dir, f"output/ComfyUI_00{str(next)}_.png")
# print(temp_str)
# image to video
results = interpolator.generate(
imgs = (temp_str, next_str),
exp=4,
# output_dir="interpolate_out"
# output_dir=os.path.join(comfyui_dir, "interpolate_out")
)
temp = next
print("Output results to: ")
print(results)
print()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,162 @@
# Input溫度, 情緒向量 < 4, 0, 0, 0, 1, 0, 0 >
import requests
import time
import random
def fetch_data():
# Function to continuously fetch temperature data
while True:
url = "http://140.119.108.248:89/ComfyUI_Data/"
response = requests.get(url)
data = response.json()
time.sleep(1) # Fetch temperature every 1 second
if data:
temperature = data['temperature']
emotions_vec = data['dominant_emotion']
yield temperature, emotions_vec
def get_dominant_emotion(emotions_vec):
# 找到最大值的索引
max_index = emotions_vec.index(max(emotions_vec))
# 用索引來查找對應的情緒
emotions = ["angry", "disgust", "fear", "happy", "sad", "surprise", "neutral"]
dominant_emotion = emotions[max_index]
return dominant_emotion
# cloud_types_and_color
def choose_cloud_and_color(dominant_emotion):
if(dominant_emotion == "angry"):
cloud_type = "cumulus, "
color = random.choice(["Red, ", "Dark red, "])
elif(dominant_emotion == "disgust"):
cloud_type = "stratus, "
color = random.choice(["Grown, ", "Dark yellow, "])
elif(dominant_emotion == "fear"):
cloud_type = "cumulonimbus, "
color = random.choice(["Red, ", "Black, "])
elif(dominant_emotion == "happy"):
cloud_type = random.choice(["cirrus, ", "cirrostratus, "])
color = random.choice(["Yellow, ", "Bright green, "])
elif(dominant_emotion == "sad"):
cloud_type = random.choice(["altostratus, ", "nimbostratus, "])
color = random.choice(["Gray, ", "Black, "])
elif(dominant_emotion == "surprise"):
cloud_type = random.choice(["cirroscumulus, ", "altocumulus, "])
color = random.choice(["Yellow, ", "Bright Green, ", "Bright Pink, "])
else:
cloud_type = "stratocumulus"
color = "No color, "
return cloud_type, color
# cloud_types_and_color
def choose_cloud_and_color_chinese(dominant_emotion):
if(dominant_emotion == "angry"):
cloud_type = "積雲, "
color = random.choice(["紅色, ", "暗紅色, "])
elif(dominant_emotion == "disgust"):
cloud_type = "層雲, "
color = random.choice(["棕色, ", "深黃色, "])
elif(dominant_emotion == "fear"):
cloud_type = "積雨雲, "
color = random.choice(["紅色, ", "黑色, "])
elif(dominant_emotion == "happy"):
cloud_type = random.choice(["捲雲, ", "卷積雲, "])
color = random.choice(["深黃色, ", "亮綠色, "])
elif(dominant_emotion == "sad"):
cloud_type = random.choice(["高層雲, ", "雨層雲, "])
color = random.choice(["灰色, ", "黑色, "])
elif(dominant_emotion == "surprise"):
cloud_type = random.choice(["卷積雲, ", "高積雲, "])
color = random.choice(["黃色, ", "亮綠色, ", "亮粉色, "])
else:
cloud_type = "層積雲"
color = "沒有顏色, "
return cloud_type, color
# timing
season_list = ['spring ', 'summer ', 'fall ', 'winter ', 'unknown ']
timing_list = ['morning, ', 'evening, ','noon, ', 'night, ', 'unknown, ' ]
season_chinese = ['春天', '夏天', '秋天', '冬天', '季節皆可']
timing_chinese = ['早上, ', '傍晚, ', '中午, ', '晚上, ', '時間皆可, ']
def choose_date(temperature):
season_num = 0
timing_num = 0
if 0 <= temperature < 10:
season_num = random.randint(0, 3)
timing_num = 3
elif 10 <= temperature < 20:
season_num = random.randint(2, 3)
timing_num = random.randint(0, 1)
elif 20 <= temperature < 30:
season_num = random.randint(0, 1)
timing_num = random.randint(0, 1)
elif 30 <= temperature <= 40:
season_num = random.randint(0, 3)
timing_num = 2
else:
season_num = 4
timing_num = 4
return season_num, timing_num
def generate_prompt():
for temperature, emotions_vec in fetch_data():
# print(f"Temperature: {temperature}, Emotions: {emotions_vec}")
temperature_str = str(temperature) #str
dominant_emotion = get_dominant_emotion(emotions_vec)
cloud_types, color = choose_cloud_and_color(dominant_emotion) # str
season_num, timing_num = choose_date(temperature)
season = season_list[season_num] #str
timing = timing_list[timing_num] #str
prompt = "weather forecast, no people, only show the sky, fantasy style, " + temperature_str + " degrees Celsius, " + cloud_types + color + season + timing
print(prompt)
# weather forecast, no people, only show the sky, fantasy style, 33 degrees Celsius, cirrus, Bright green, spring noon,
return prompt
def generate_chinese_prompt():
for temperature, emotions_vec in fetch_data():
# print(f"Temperature: {temperature}, Emotions: {emotions_vec}")
temperature_str = str(temperature) #str
dominant_emotion = get_dominant_emotion(emotions_vec)
cloud_types, color = choose_cloud_and_color_chinese(dominant_emotion) # str
season_num, timing_num = choose_date(temperature)
season = season_chinese[season_num] #str
timing = timing_chinese[timing_num] #str
prompt = "天氣預報, 沒有人物, 只顯示雲層, 奇幻風格, " + temperature_str + " 攝氏度, " + cloud_types + color + season + timing
print(prompt)
# weather forecast, no people, only show the sky, fantasy style, 33 degrees Celsius, cirrus, Bright green, spring noon,
return prompt
# if __name__ == "__main__":
# generate_prompt()

101
test_prompt/test.py Normal file
View File

@ -0,0 +1,101 @@
#This is an example that uses the websockets api and the SaveImageWebsocket node to get images directly without
#them being saved to disk
import websocket #NOTE: websocket-client (https://github.com/websocket-client/websocket-client)
import uuid
import json
import urllib.request
import urllib.parse
import time
import requests
import random
server_address = "127.0.0.1:8188"
client_id = str(uuid.uuid4())
def queue_prompt(prompt):
p = {"prompt": prompt, "client_id": client_id}
data = json.dumps(p).encode('utf-8')
req = urllib.request.Request("http://{}/prompt".format(server_address), data=data)
return json.loads(urllib.request.urlopen(req).read())
def get_image(filename, subfolder, folder_type):
data = {"filename": filename, "subfolder": subfolder, "type": folder_type}
url_values = urllib.parse.urlencode(data)
with urllib.request.urlopen("http://{}/view?{}".format(server_address, url_values)) as response:
return response.read()
def get_history(prompt_id):
with urllib.request.urlopen("http://{}/history/{}".format(server_address, prompt_id)) as response:
return json.loads(response.read())
def get_images(ws, prompt):
prompt_id = queue_prompt(prompt)['prompt_id']
output_images = {}
current_node = ""
while True:
out = ws.recv()
if isinstance(out, str):
message = json.loads(out)
if message['type'] == 'executing':
data = message['data']
if data['prompt_id'] == prompt_id:
if data['node'] is None:
break #Execution is done
else:
current_node = data['node']
else:
if current_node == 'save_image_websocket_node':
images_output = output_images.get(current_node, [])
images_output.append(out[8:])
output_images[current_node] = images_output
return output_images
# prompt_text = """
# {
# ...
# }
# """
# prompt = json.loads(prompt_text)
def main():
with open("workflow_api.json") as workflow_api:
prompt = json.load(workflow_api)
ws = websocket.WebSocket()
ws.connect("ws://{}/ws?clientId={}".format(server_address, client_id))
prompt_text = "23 degrees Celsius, no people, only show the sky, weather forecast, cloudy"
# 更新 CLIPTextEncode 的文本提示
prompt["6"]["inputs"]["text"] = prompt_text
# 将提示加入队列并获取 prompt_id
queue_prompt(prompt)['prompt_id']
# 打印生成的提示文本
print(prompt_text)
#set the seed for our KSampler node
# prompt["3"]["inputs"]["seed"] = 5
# prompt["6"]["inputs"]["text"] = "green sky"
# queue_prompt(prompt)['prompt_id']
# images = get_images(ws, prompt)
#Commented out code to display the output images:
# for node_id in images:
# for image_data in images[node_id]:
# from PIL import Image
# import io
# image = Image.open(io.BytesIO(image_data))
# image.show()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,107 @@
{
"3": {
"inputs": {
"seed": 787475629082271,
"steps": 20,
"cfg": 8,
"sampler_name": "euler",
"scheduler": "normal",
"denoise": 1,
"model": [
"4",
0
],
"positive": [
"6",
0
],
"negative": [
"7",
0
],
"latent_image": [
"5",
0
]
},
"class_type": "KSampler",
"_meta": {
"title": "KSampler"
}
},
"4": {
"inputs": {
"ckpt_name": "absolutereality_v181.safetensors"
},
"class_type": "CheckpointLoaderSimple",
"_meta": {
"title": "Load Checkpoint"
}
},
"5": {
"inputs": {
"width": 512,
"height": 512,
"batch_size": 1
},
"class_type": "EmptyLatentImage",
"_meta": {
"title": "Empty Latent Image"
}
},
"6": {
"inputs": {
"text" : "",
"clip": [
"4",
1
]
},
"class_type": "CLIPTextEncode",
"_meta": {
"title": "CLIP Text Encode (Prompt)"
}
},
"7": {
"inputs": {
"text": "text, watermark, people",
"clip": [
"4",
1
]
},
"class_type": "CLIPTextEncode",
"_meta": {
"title": "CLIP Text Encode (Prompt)"
}
},
"8": {
"inputs": {
"samples": [
"3",
0
],
"vae": [
"4",
2
]
},
"class_type": "VAEDecode",
"_meta": {
"title": "VAE Decode"
}
},
"9": {
"inputs": {
"filename_prefix": "ComfyUI",
"images": [
"8",
0
]
},
"class_type": "SaveImage",
"_meta": {
"title": "Save Image"
}
}
}