import gradio as gr import requests import os import time import json from datetime import datetime import oss2 import cv2 from pathlib import Path import decord from gradio.utils import get_cache_folder cache_version = 20250325 dashscope_api_key = os.getenv("API_KEY","") class Examples(gr.helpers.Examples): def __init__(self, *args, directory_name=None, **kwargs): super().__init__(*args, **kwargs, _initiated_directly=False) if directory_name is not None: self.cached_folder = get_cache_folder() / directory_name self.cached_file = Path(self.cached_folder) / "log.csv" self.create() def upload_to_oss(local_file_path, remote_file_path, expire_time=3600): remote_url = "motionshop/%s/%s" %(datetime.now().strftime("%Y%m%d"), remote_file_path) for i in range(5): try: from oss2.credentials import EnvironmentVariableCredentialsProvider auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider()) bucket = oss2.Bucket(auth, 'oss-cn-hangzhou.aliyuncs.com', 'virtualbuy-devo') bucket.put_object_from_file(key=remote_url, filename=local_file_path) break except Exception as e: if i < 4: # If this is not the last retry time.sleep(2) # Wait for 2 second before next retry continue else: # If this is the last retry and it still fails raise e return bucket.sign_url('GET', remote_url, expire_time) def get_url(filepath): filename = os.path.basename(filepath) remote_file_path = "test/%s" %filename return upload_to_oss(filepath, remote_file_path) def online_detect(filepath): url = "https://poc-dashscope.aliyuncs.com/api/v1/services/default/default/default" headers = { "Content-Type": "application/json", "Authorization": "Bearer {}".format(dashscope_api_key) } data = { "model": "pre-motionshop-detect-gradio", "input": { "video_url": filepath }, "parameters": { "threshold": 0.4, "min_area_ratio": 0.001 } } print("Call detect api, params: " + json.dumps(data)) query_result_request = requests.post( url, json=data, headers=headers ) print("Detect api returned: " + query_result_request.text) return json.loads(query_result_request.text) def online_render(filepath, frame_id, bbox, replacement_ids, cache_url=None, model="pre-motionshop-render-gradio"): url = "https://poc-dashscope.aliyuncs.com/api/v1/services/async-default/async-default/async-default" headers = { "Content-Type": "application/json", "Authorization": "Bearer {}".format(dashscope_api_key), "X-DashScope-Async": "enable" } data = { "model": model, # "model": "pre-motionshop-render-gradio", "input": { "video_url": filepath, "frame_index": frame_id, "bbox": bbox, "replacement_id": replacement_ids }, "parameters": { } } if cache_url is not None: data["input"]["cache_url"] = cache_url print("Call render video api with params: " + json.dumps(data)) query_result_request = requests.post( url, json=data, headers=headers ) print("Render video api returned: " + query_result_request.text) return json.loads(query_result_request.text) def get_async_result(task_id): while True: result = requests.post( "https://poc-dashscope.aliyuncs.com/api/v1/tasks/%s" %task_id, headers={ "Authorization": "Bearer {}".format(dashscope_api_key), } ) result = json.loads(result.text) if "output" in result and result["output"]["task_status"] in ["SUCCEEDED", "FAILED"]: break time.sleep(1) return result def save_video_cv2(vid, resize_video_input, resize_h, resize_w, fps): fourcc = cv2.VideoWriter_fourcc(*'XVID') out = cv2.VideoWriter(resize_video_input, fourcc, fps, (resize_w, resize_h)) for idx in range(len(vid)): frame = vid[idx].asnumpy()[:,:,::-1] frame = cv2.resize(frame,(resize_w, resize_h)) out.write(frame) out.release() def detect_human(video_input): # print(video_input) video_input_basename = os.path.basename(video_input) resize_video_input = os.path.join(os.path.dirname(video_input), video_input_basename.split(".")[0]+"_resize."+video_input_basename.split(".")[-1]) vid = decord.VideoReader(video_input) fps = vid.get_avg_fps() H, W, C = vid[0].shape if H > 1280 or W > 1280: if H > W: resize_h, resize_w = 1280, int(W*1280/H) else: resize_h, resize_w = int(H*1280/W), 1280 save_video_cv2(vid, resize_video_input, resize_h, resize_w, fps) new_video_input = resize_video_input else: # resize_h, resize_w = H, W new_video_input = video_input video_url = get_url(new_video_input) detect_result = online_detect(video_url) check_result = "output" in detect_result select_frame_index = detect_result["output"]["frame_index"] boxes = detect_result["output"]["bbox"][:3] print("Detected %d characters" %len(boxes)) cap = cv2.VideoCapture(new_video_input) cap.set(cv2.CAP_PROP_POS_FRAMES, select_frame_index) _, box_image = cap.read() box_image = cv2.cvtColor(box_image, cv2.COLOR_BGR2RGB) width, height = box_image.shape[1], box_image.shape[0] for i, box in enumerate(boxes): box = [ (box[0] - box[2] / 2) * width, (box[1] - box[3] / 2) * height, (box[0] + box[2] / 2) * width, (box[1] + box[3] / 2) * height] # box_image = cv2.rectangle(box_image, (box[0], box[1]), (box[2], box[3]), (0, 255, 0), 2) if i == 0: box_image = cv2.rectangle(box_image, (int(box[0]), int(box[1])), (int(box[2]), int(box[3])), (255, 0, 0), 2) if i == 1: box_image = cv2.rectangle(box_image, (int(box[0]), int(box[1])), (int(box[2]), int(box[3])), (0, 255, 0), 2) if i == 2: box_image = cv2.rectangle(box_image, (int(box[0]), int(box[1])), (int(box[2]), int(box[3])), (0, 0, 255), 2) # check_result, select_frame_index, box, box_image, _ = object_detector.getGroundingInfo(video_input) video_state = { "check_result": check_result, "select_frame_index": select_frame_index, "box": boxes, "replace_ids": [], "image_to_3d_tasks": {}, "video_url": video_url, "video_path": new_video_input } return video_state, box_image, gr.update(visible=True), gr.update(visible=False) def predict(video_state, first_image, second_image, third_image): if len(video_state["box"]) == 0: return None, "No human detected, please use a video with clear human" print("images:", first_image, second_image, third_image) tasks = [] boxes = [] if first_image is not None and len(video_state["box"]) >= 1: tasks.append(image_to_3d(first_image)) boxes.append(video_state["box"][0]) if second_image is not None and len(video_state["box"]) >= 2: tasks.append(image_to_3d(second_image)) boxes.append(video_state["box"][1]) if third_image is not None and len(video_state["box"]) >= 3: tasks.append(image_to_3d(third_image)) boxes.append(video_state["box"][2]) if len(tasks) == 0: return None, "Please upload at least one character photo for replacement." ids = [] for t in tasks: try: image_to_3d_result = get_async_result(t) print("image to 3d finished", image_to_3d_result) ids.append(image_to_3d_result["output"]["ply_url"]) except Exception as e: print(e) return None, "Error in 3d model generation, please check the uploaded image" if (video_state["check_result"]): try: taskid = online_render(video_state["video_url"], video_state["select_frame_index"], boxes, ids, None)["output"]["task_id"] task_output = get_async_result(taskid) print("Video synthesis completed, api returned: " + json.dumps(task_output)) video_url = task_output["output"]["synthesis_video_url"] return video_url, "Processing Success" except Exception as e: print(e) return None, "Error in video synthesis, please change the material and try again" else: return None, "Error in human detection, please use a video with clear human" def online_img_to_3d(img_url): url = "https://poc-dashscope.aliyuncs.com/api/v1/services/async-default/async-default/async-default" headers = { "Content-Type": "application/json", "Authorization": "Bearer {}".format(dashscope_api_key), "X-DashScope-Async": "enable" } data = { # "model": "pre-Human3DGS", "model": "pre-image-to-3d-gradio", # "model": "pre-motionshop-render-h20-test", "input": { "image_url": img_url, }, "parameters": { } } query_result_request = requests.post( url, json=data, headers=headers ) print("Call image to 3d api, params: " + json.dumps(data)) return json.loads(query_result_request.text) def image_to_3d(image_path): url = get_url(image_path) task_send_result = online_img_to_3d(url) image_to_3d_task_id = task_send_result["output"]["task_id"] return image_to_3d_task_id def gradio_demo(): with gr.Blocks() as iface: """ state for """ video_state = gr.State( { "check_result": False, "select_frame_index": 0, "box": [], "replace_ids": [], "image_to_3d_tasks": {}, "video_url": "", "video_path": "" } ) gr.HTML( """

Motionshop2

Project Page Project Page Project Page
""" ) gr.Markdown("""

1. Choose or upload a video (duration<=15s, resolution<=720p)

""") with gr.Row(): with gr.Column(): gr.HTML(""" """) video_input = gr.Video(elem_id="input_video") template_frame = gr.Image(type="pil",interactive=True, elem_id="template_frame", visible=False) Examples( fn=detect_human, examples=sorted([ os.path.join("files", "example_videos", name) for name in os.listdir(os.path.join("files", "example_videos")) ]), run_on_click=True, inputs=[video_input], outputs=[video_state, template_frame, template_frame, video_input], directory_name="examples_videos", cache_examples=False, ) gr.Markdown("""

2.Choose or upload images to replace

""") with gr.Row(): with gr.Column(): gr.Markdown("Replace the character in the red box with...") with gr.Row(): first_image = gr.Image(type="filepath",interactive=True, elem_id="first_image", visible=True, height=480, width=270) first_example = gr.Examples( examples=sorted([os.path.join("files", "example_images", name) for name in os.listdir(os.path.join("files", "example_images"))]), inputs=[first_image], examples_per_page=6 ) with gr.Column(): gr.Markdown("Replace the character in the green box with...") with gr.Row(): second_image = gr.Image(type="filepath",interactive=True, elem_id="second_image", visible=True, height=480, width=270) second_example = gr.Examples( examples=sorted([os.path.join("files", "example_images", name) for name in os.listdir(os.path.join("files", "example_images"))]), inputs=[second_image], examples_per_page=6 ) with gr.Column(): gr.Markdown("Replace the character in the blue box with...") with gr.Row(): third_image = gr.Image(type="filepath",interactive=True, elem_id="third_image", visible=True, height=480, width=270) third_example = gr.Examples( examples=sorted([os.path.join("files", "example_images", name) for name in os.listdir(os.path.join("files", "example_images"))]), inputs=[third_image], examples_per_page=6 ) gr.Markdown("""

3.Click Start (each generation may take 3 minutes due to the use of SOTA video inpainting and pose estimation methods)

""") with gr.Row(): with gr.Column(): motion_shop_predict_button = gr.Button(value="Start", variant="primary") video_output = gr.Video(elem_id="output_video") error_message = gr.Textbox(label="Processing Status", visible=True, interactive=False) video_input.upload( fn=detect_human, inputs=[ video_input ], outputs=[video_state, template_frame, template_frame, video_input], ) motion_shop_predict_button.click( fn=predict, inputs=[video_state, first_image, second_image, third_image], outputs=[video_output, error_message] ) # clear input template_frame.clear( lambda: ( { "check_result": False, "select_frame_index": 0, "box": [], "replace_ids": [], "image_to_3d_tasks": {}, "video_url": "", "video_path": "" }, None, None, None, gr.update(visible=True), gr.update(visible=False), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(value="") ), [], [ video_state, video_output, template_frame, video_input, video_input, template_frame, first_image, second_image, third_image, error_message ], queue=False, show_progress=False) # print("username:", uuid_output_field) # set example # gr.Markdown("## Examples") # gr.Examples( # examples=[os.path.join(os.path.dirname(__file__), "./test_sample/", test_sample) for test_sample in ["test-sample8.mp4","test-sample4.mp4", \ # "test-sample2.mp4","test-sample13.mp4"]], # fn=run_example, # inputs=[ # e.s video_input # ], # outputs=[video_input], # # cache_examples=True, # ) iface.queue(default_concurrency_limit=200) iface.launch(debug=False, max_threads=10, server_name="0.0.0.0") if __name__=="__main__": gradio_demo() # iface.launch(debug=True, enable_queue=True)