Spaces:
Runtime error
Runtime error
| import spaces | |
| import os | |
| import gradio as gr | |
| import torch | |
| import numpy as np | |
| import cv2 | |
| import safetensors | |
| from PIL import Image, ImageDraw | |
| from diffusers import AutoencoderKL | |
| from diffusers.utils import load_image, check_min_version | |
| from controlnet_flux import FluxControlNetModel | |
| from pipeline_flux_controlnet_inpaint import FluxControlNetInpaintingPipeline | |
| from transformers import AutoProcessor, pipeline, AutoModelForMaskGeneration | |
| from diffusers.models.attention_processor import Attention | |
| from dataclasses import dataclass | |
| from typing import Any, List, Dict, Optional, Union, Tuple | |
| from diffusers import BitsAndBytesConfig as DiffusersBitsAndBytesConfig, FluxTransformer2DModel, FluxPipeline | |
| from transformers import BitsAndBytesConfig as BitsAndBytesConfig, T5EncoderModel | |
| # Ensure that the minimal version of diffusers is installed | |
| check_min_version("0.30.2") | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| os.environ['PYTORCH_NO_CUDA_MEMORY_CACHING'] = '1' | |
| dtype = torch.bfloat16 | |
| good_vae = AutoencoderKL.from_pretrained("black-forest-labs/FLUX.1-dev", | |
| subfolder="vae", | |
| torch_dtype=dtype, | |
| use_safetensors=True, | |
| token=HF_TOKEN | |
| ).to("cuda") | |
| # quant_config = DiffusersBitsAndBytesConfig(load_in_8bit=True) | |
| # transformer_8bit = FluxTransformer2DModel.from_pretrained( | |
| # "black-forest-labs/FLUX.1-dev", | |
| # subfolder="transformer", | |
| # quantization_config=quant_config, | |
| # torch_dtype=dtype, | |
| # token=HF_TOKEN | |
| # ) | |
| # Quantize the text encoder to 8-bit precision | |
| quant_config = BitsAndBytesConfig(load_in_8bit=True) | |
| text_encoder_8bit = T5EncoderModel.from_pretrained( | |
| "black-forest-labs/FLUX.1-dev", | |
| subfolder="text_encoder_2", | |
| quantization_config=quant_config, | |
| torch_dtype=torch.float16, | |
| token=HF_TOKEN | |
| ) | |
| # # Load necessary models and processors | |
| # controlnet = FluxControlNetModel.from_pretrained("alimama-creative/FLUX.1-dev-Controlnet-Inpainting-Beta", torch_dtype=torch.bfloat16) | |
| # pipe = FluxControlNetInpaintingPipeline.from_pretrained( | |
| # "LPX55/FLUX.1-merged_uncensored", | |
| # vae=good_vae, | |
| # # transformer=transformer_8bit, | |
| # controlnet=controlnet, | |
| # torch_dtype=dtype, | |
| # use_safetensors=True, | |
| # token=HF_TOKEN | |
| # ).to("cuda") | |
| controlnet = FluxControlNetModel.from_pretrained("alimama-creative/FLUX.1-dev-Controlnet-Inpainting-Beta", torch_dtype=torch.bfloat16) | |
| pipe = FluxControlNetInpaintingPipeline.from_pretrained( | |
| "black-forest-labs/FLUX.1-dev", | |
| controlnet=controlnet, | |
| torch_dtype=torch.bfloat16 | |
| ).to("cuda") | |
| pipe.transformer.to(torch.bfloat16) | |
| pipe.controlnet.to(torch.bfloat16) | |
| pipe.text_encoder_2 = text_encoder_8bit | |
| base_attn_procs = pipe.transformer.attn_processors.copy() | |
| detector_id = "IDEA-Research/grounding-dino-tiny" | |
| segmenter_id = "facebook/sam-vit-base" | |
| segmentator = AutoModelForMaskGeneration.from_pretrained(segmenter_id).cuda() | |
| segment_processor = AutoProcessor.from_pretrained(segmenter_id) | |
| object_detector = pipeline(model=detector_id, task="zero-shot-object-detection", device=torch.device("cuda")) | |
| class BoundingBox: | |
| xmin: int | |
| ymin: int | |
| xmax: int | |
| ymax: int | |
| def xyxy(self) -> List[float]: | |
| return [self.xmin, self.ymin, self.xmax, self.ymax] | |
| class DetectionResult: | |
| score: float | |
| label: str | |
| box: BoundingBox | |
| mask: Optional[np.array] = None | |
| def from_dict(cls, detection_dict: Dict) -> 'DetectionResult': | |
| return cls(score=detection_dict['score'], | |
| label=detection_dict['label'], | |
| box=BoundingBox(xmin=detection_dict['box']['xmin'], | |
| ymin=detection_dict['box']['ymin'], | |
| xmax=detection_dict['box']['xmax'], | |
| ymax=detection_dict['box']['ymax'])) | |
| def mask_to_polygon(mask: np.ndarray) -> List[List[int]]: | |
| contours, _ = cv2.findContours(mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| if not contours: | |
| return [] | |
| largest_contour = max(contours, key=cv2.contourArea) | |
| polygon = largest_contour.reshape(-1, 2).tolist() | |
| return polygon | |
| def polygon_to_mask(polygon: List[Tuple[int, int]], image_shape: Tuple[int, int]) -> np.ndarray: | |
| mask = np.zeros(image_shape, dtype=np.uint8) | |
| pts = np.array(polygon, dtype=np.int32) | |
| cv2.fillPoly(mask, [pts], color=(255,)) | |
| return mask | |
| def get_boxes(results: List[DetectionResult]) -> List[List[List[float]]]: | |
| boxes = [] | |
| for result in results: | |
| xyxy = result.box.xyxy | |
| boxes.append(xyxy) | |
| return [boxes] | |
| def refine_masks(masks: torch.BoolTensor, polygon_refinement: bool = False) -> List[np.ndarray]: | |
| masks = masks.cpu().float() | |
| masks = masks.permute(0, 2, 3, 1) | |
| masks = masks.mean(axis=-1) | |
| masks = (masks > 0).int() | |
| masks = masks.numpy().astype(np.uint8) | |
| masks = list(masks) | |
| if polygon_refinement: | |
| for idx, mask in enumerate(masks): | |
| shape = mask.shape | |
| polygon = mask_to_polygon(mask) | |
| mask = polygon_to_mask(polygon, shape) | |
| masks[idx] = mask | |
| return masks | |
| def detect( | |
| object_detector, | |
| image: Image.Image, | |
| labels: List[str], | |
| threshold: float = 0.3, | |
| detector_id: Optional[str] = None | |
| ) -> List[Dict[str, Any]]: | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| detector_id = detector_id if detector_id is not None else detector_id | |
| labels = [label if label.endswith(".") else label+"." for label in labels] | |
| results = object_detector(image, candidate_labels=labels, threshold=threshold) | |
| results = [DetectionResult.from_dict(result) for result in results] | |
| return results | |
| def segment( | |
| segmentator, | |
| processor, | |
| image_tensor: torch.Tensor, | |
| detection_results: List[Dict[str, Any]], | |
| polygon_refinement: bool = False | |
| ) -> List[DetectionResult]: | |
| device = image_tensor.device | |
| boxes = get_boxes(detection_results) | |
| # Convert image tensor to float32 for processing | |
| image_tensor_float32 = image_tensor.to(torch.float32) | |
| inputs = processor(images=image_tensor_float32, input_boxes=boxes, return_tensors="pt", torch_dtype=torch.float32) | |
| # Process inputs and get outputs | |
| outputs = segmentator(**inputs) | |
| # Convert masks to bfloat16 if needed | |
| masks = outputs.pred_masks.to(torch.bfloat16) | |
| masks = processor.post_process_masks( | |
| masks=masks, | |
| original_sizes=inputs.original_sizes, | |
| reshaped_input_sizes=inputs.reshaped_input_sizes | |
| )[0] | |
| masks = refine_masks(masks, polygon_refinement) | |
| for detection_result, mask in zip(detection_results, masks): | |
| detection_result.mask = mask | |
| return detection_results | |
| def grounded_segmentation( | |
| detect_pipeline, | |
| segmentator, | |
| segment_processor, | |
| image: Union[Image.Image, str], | |
| labels: List[str], | |
| threshold: float = 0.3, | |
| polygon_refinement: bool = False, | |
| detector_id: Optional[str] = None, | |
| segmenter_id: Optional[str] = None | |
| ) -> Tuple[np.ndarray, List[DetectionResult]]: | |
| if isinstance(image, str): | |
| image = load_image(image) | |
| # Convert image to tensor and to float32 for processing | |
| image_tensor = torch.tensor(np.array(image), dtype=torch.float32, device="cuda").permute(2, 0, 1).unsqueeze(0) / 255.0 | |
| detections = detect(detect_pipeline, image, labels, threshold, detector_id) | |
| detections = segment(segmentator, segment_processor, image_tensor, detections, polygon_refinement) | |
| # Convert image tensor back to numpy array for return | |
| image_array = image_tensor.squeeze(0).permute(1, 2, 0).cpu().numpy() * 255 | |
| image_array = image_array.astype(np.uint8) | |
| return image_array, detections | |
| class CustomFluxAttnProcessor2_0: | |
| def __init__(self, height=44, width=88, attn_enforce=1.0): | |
| if not hasattr(torch.nn.functional, "scaled_dot_product_attention"): | |
| raise ImportError("FluxAttnProcessor2_0 requires PyTorch 2.0, to use it, please upgrade PyTorch to 2.0.") | |
| self.height = height | |
| self.width = width | |
| self.num_pixels = height * width | |
| self.step = 0 | |
| self.attn_enforce = attn_enforce | |
| def __call__( | |
| self, | |
| attn: Attention, | |
| hidden_states: torch.FloatTensor, | |
| encoder_hidden_states: torch.FloatTensor = None, | |
| attention_mask: Optional[torch.FloatTensor] = None, | |
| image_rotary_emb: Optional[torch.Tensor] = None, | |
| ) -> torch.FloatTensor: | |
| self.step += 1 | |
| batch_size, _, _ = hidden_states.shape if encoder_hidden_states is None else encoder_hidden_states.shape | |
| query = attn.to_q(hidden_states) | |
| key = attn.to_k(hidden_states) | |
| value = attn.to_v(hidden_states) | |
| inner_dim = key.shape[-1] | |
| head_dim = inner_dim // attn.heads | |
| query = query.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2) | |
| key = key.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2) | |
| value = value.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2) | |
| if attn.norm_q is not None: | |
| query = attn.norm_q(query) | |
| if attn.norm_k is not None: | |
| key = attn.norm_k(key) | |
| if encoder_hidden_states is not None: | |
| encoder_hidden_states_query_proj = attn.add_q_proj(encoder_hidden_states) | |
| encoder_hidden_states_key_proj = attn.add_k_proj(encoder_hidden_states) | |
| encoder_hidden_states_value_proj = attn.add_v_proj(encoder_hidden_states) | |
| encoder_hidden_states_query_proj = encoder_hidden_states_query_proj.view( | |
| batch_size, -1, attn.heads, head_dim | |
| ).transpose(1, 2) | |
| encoder_hidden_states_key_proj = encoder_hidden_states_key_proj.view( | |
| batch_size, -1, attn.heads, head_dim | |
| ).transpose(1, 2) | |
| encoder_hidden_states_value_proj = encoder_hidden_states_value_proj.view( | |
| batch_size, -1, attn.heads, head_dim | |
| ).transpose(1, 2) | |
| if attn.norm_added_q is not None: | |
| encoder_hidden_states_query_proj = attn.norm_added_q(encoder_hidden_states_query_proj) | |
| if attn.norm_added_k is not None: | |
| encoder_hidden_states_key_proj = attn.norm_added_k(encoder_hidden_states_key_proj) | |
| query = torch.cat([encoder_hidden_states_query_proj, query], dim=2) | |
| key = torch.cat([encoder_hidden_states_key_proj, key], dim=2) | |
| value = torch.cat([encoder_hidden_states_value_proj, value], dim=2) | |
| if image_rotary_emb is not None: | |
| from diffusers.models.embeddings import apply_rotary_emb | |
| query = apply_rotary_emb(query, image_rotary_emb) | |
| key = apply_rotary_emb(key, image_rotary_emb) | |
| if self.attn_enforce != 1.0: | |
| attn_probs = (torch.einsum('bhqd,bhkd->bhqk', query, key) * attn.scale).softmax(dim=-1) | |
| img_attn_probs = attn_probs[:, :, -self.num_pixels:, -self.num_pixels:] | |
| img_attn_probs = img_attn_probs.reshape((batch_size, attn.heads, self.height, self.width, self.height, self.width)) | |
| img_attn_probs[:, :, :, self.width//2:, :, :self.width//2] *= self.attn_enforce | |
| img_attn_probs = img_attn_probs.reshape((batch_size, attn.heads, self.num_pixels, self.num_pixels)) | |
| attn_probs[:, :, -self.num_pixels:, -self.num_pixels:] = img_attn_probs | |
| hidden_states = torch.einsum('bhqk,bhkd->bhqd', attn_probs, value) | |
| else: | |
| hidden_states = torch.nn.functional.scaled_dot_product_attention(query, key, value, dropout_p=0.0, is_causal=False) | |
| hidden_states = hidden_states.transpose(1, 2).reshape(batch_size, -1, attn.heads * head_dim) | |
| hidden_states = hidden_states.to(query.dtype) | |
| if encoder_hidden_states is not None: | |
| encoder_hidden_states, hidden_states = ( | |
| hidden_states[:, : encoder_hidden_states.shape[1]], | |
| hidden_states[:, encoder_hidden_states.shape[1] :], | |
| ) | |
| hidden_states = attn.to_out[0](hidden_states) | |
| hidden_states = attn.to_out[1](hidden_states) | |
| encoder_hidden_states = attn.to_add_out(encoder_hidden_states) | |
| return hidden_states, encoder_hidden_states | |
| else: | |
| return hidden_states | |
| def segment_image(image, object_name): | |
| image_array, detections = grounded_segmentation( | |
| object_detector, | |
| segmentator, | |
| segment_processor, | |
| image=image, | |
| labels=object_name, | |
| threshold=0.3, | |
| polygon_refinement=True, | |
| ) | |
| segment_result = image_array * np.expand_dims((255 - detections[0].mask) / 255, axis=-1) | |
| segmented_image = Image.fromarray(segment_result.astype(np.uint8)) | |
| return segmented_image | |
| def make_diptych(image): | |
| ref_image = np.array(image) | |
| ref_image = np.concatenate([ref_image, np.zeros_like(ref_image)], axis=1) | |
| ref_image = Image.fromarray(ref_image) | |
| return ref_image | |
| def inpaint_image(image, prompt, object_name): | |
| width = 512 | |
| height = 512 | |
| size = (width * 2, height) | |
| diptych_text_prompt = f"A diptych with two side-by-side images of same {object_name}. On the left, a photo of {object_name}. On the right, {prompt}" | |
| reference_image = image.resize((width, height)).convert("RGB") | |
| segmented_image = segment_image(reference_image, object_name) | |
| mask_image = np.concatenate([np.zeros((height, width, 3)), np.ones((height, width, 3))*255], axis=1) | |
| mask_image = Image.fromarray(mask_image.astype(np.uint8)) | |
| diptych_image_prompt = make_diptych(segmented_image) | |
| base_attn_procs = pipe.transformer.attn_processors.copy() | |
| new_attn_procs = base_attn_procs.copy() | |
| for i, (k, v) in enumerate(new_attn_procs.items()): | |
| new_attn_procs[k] = CustomFluxAttnProcessor2_0(height=height // 16, width=width // 16 * 2, attn_enforce=1.3) | |
| pipe.transformer.set_attn_processor(new_attn_procs) | |
| generator = torch.Generator(device="cuda").manual_seed(42) | |
| with torch.no_grad(): | |
| result = pipe( | |
| prompt=diptych_text_prompt, | |
| height=size[1], | |
| width=size[0], | |
| control_image=diptych_image_prompt, | |
| control_mask=mask_image, | |
| num_inference_steps=20, | |
| generator=generator, | |
| controlnet_conditioning_scale=0.95, | |
| guidance_scale=3.5, | |
| negative_prompt="", | |
| true_guidance_scale=3.5 | |
| ).images[0] | |
| result = result.crop((width, 0, width*2, height)) | |
| torch.cuda.empty_cache() | |
| return result, diptych_image_prompt | |
| # Create Gradio interface | |
| iface = gr.Interface( | |
| fn=inpaint_image, | |
| inputs=[ | |
| gr.Image(type="pil", label="Upload Image"), | |
| gr.Textbox(lines=3, value="replicate this {subject_name} exactly but as a photo of the {subject_name} surfing on the beach", label="Prompt"), | |
| gr.Textbox(lines=1, value="bear plushie", label="Subject Name") | |
| ], | |
| outputs=[ | |
| gr.Image(type="pil", label="Inpainted Image"), | |
| gr.Image(type="pil", label="Diptych Image") | |
| ], | |
| title="FLUX Inpainting with Diptych Prompting", | |
| description="Upload an image, specify a prompt, and provide the subject name. The app will automatically generate the inpainted image." | |
| ) | |
| # Launch the app | |
| iface.launch() |