ChenDY commited on
Commit
42d7985
·
1 Parent(s): 2619fea
.gitignore ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ .idea/
2
+
3
+ __pycache__/
4
+ *.py[cod]
5
+ *$py.class
6
+
README.md CHANGED
@@ -1,12 +1,15 @@
1
  ---
2
  title: NAG FLUX.1-dev
3
- emoji: 🏢
4
- colorFrom: green
5
- colorTo: purple
6
  sdk: gradio
7
  sdk_version: 5.32.0
8
  app_file: app.py
9
  pinned: false
 
 
10
  ---
11
 
 
12
  Check out the configuration reference at https://huggingface.co/docs/hub/spaces-config-reference
 
1
  ---
2
  title: NAG FLUX.1-dev
3
+ emoji: 🚀
4
+ colorFrom: purple
5
+ colorTo: gray
6
  sdk: gradio
7
  sdk_version: 5.32.0
8
  app_file: app.py
9
  pinned: false
10
+ license: mit
11
+ short_description: Demo of Normalized Attention Guidance for FLUX.1-dev
12
  ---
13
 
14
+ [[arXiv Paper]](https://arxiv.org/abs/2505.21179) [[Project Page]](https://chendaryen.github.io/NAG.github.io/)
15
  Check out the configuration reference at https://huggingface.co/docs/hub/spaces-config-reference
app.py ADDED
@@ -0,0 +1,161 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import random
2
+ import os
3
+
4
+ import spaces
5
+ import torch
6
+ from PIL import Image
7
+ import huggingface_hub
8
+ import gradio as gr
9
+
10
+ from src.pipeline_flux_nag import NAGFluxPipeline
11
+ from src.transformer_flux import NAGFluxTransformer2DModel
12
+
13
+
14
+ theme = gr.themes.Base(
15
+ font=[gr.themes.GoogleFont('Libre Franklin'), gr.themes.GoogleFont('Public Sans'), 'system-ui', 'sans-serif'],
16
+ )
17
+
18
+ transformer = NAGFluxTransformer2DModel.from_pretrained(
19
+ "black-forest-labs/FLUX.1-dev",
20
+ subfolder="transformer",
21
+ torch_dtype=torch.bfloat16,
22
+ )
23
+ pipe = NAGFluxPipeline.from_pretrained(
24
+ "black-forest-labs/FLUX.1-dev",
25
+ transformer=transformer,
26
+ torch_dtype=torch.bfloat16,
27
+ )
28
+
29
+ device = "cuda"
30
+ pipe = pipe.to(device)
31
+
32
+ examples = [
33
+ ["Portrait of AI researcher.", "Glasses.", 5],
34
+ ["A beautiful cyborg.", "Robot.", 5],
35
+ ["Minimalist abstract line drawing: face portrait of a girl with long hair.", "Complex, detail.", 5],
36
+ ["A baby phoenix made of fire and flames is born from the smoking ashes.", "Low resolution, blurry, lack of details, illustration, cartoon, painting.", 5],
37
+ ["A tiny astronaut hatching from an egg on the moon.", "Low resolution, blurry, lack of details, illustration, cartoon, painting.", 5]
38
+ ]
39
+
40
+ @spaces.GPU
41
+ def sample(
42
+ prompt,
43
+ negative_prompt=None, guidance_scale=3.5,
44
+ nag_negative_prompt=None, nag_scale=5.0,
45
+ num_inference_steps=25,
46
+ seed=2025, randomize_seed=False,
47
+ compare=True,
48
+ ):
49
+ prompt = prompt.strip()
50
+ negative_prompt = negative_prompt.strip() if negative_prompt and negative_prompt.strip() else None
51
+ guidance_scale = float(guidance_scale)
52
+ num_inference_steps = int(num_inference_steps)
53
+
54
+ if (randomize_seed):
55
+ seed = random.randint(0, 9007199254740991)
56
+ else:
57
+ seed = int(seed)
58
+
59
+ generator = torch.Generator(device="cuda").manual_seed(seed)
60
+ image_nag = pipe(
61
+ prompt,
62
+ negative_prompt=negative_prompt,
63
+ guidance_scale=guidance_scale,
64
+ nag_negative_prompt=nag_negative_prompt,
65
+ nag_scale=nag_scale,
66
+ generator=generator,
67
+ num_inference_steps=num_inference_steps,
68
+ ).images[0]
69
+
70
+ if compare:
71
+ generator = torch.Generator(device="cuda").manual_seed(seed)
72
+ image_normal = pipe(
73
+ prompt,
74
+ negative_prompt=negative_prompt,
75
+ guidance_scale=guidance_scale,
76
+ generator=generator,
77
+ num_inference_steps=num_inference_steps,
78
+ ).images[0]
79
+ else:
80
+ image_normal = Image.new("RGB", image_nag.size, color=(0, 0, 0))
81
+
82
+ return (image_normal, image_nag), seed
83
+
84
+
85
+ def sample_example(
86
+ prompt,
87
+ nag_negative_prompt,
88
+ nag_scale,
89
+ ):
90
+ outputs, seed = sample(
91
+ prompt=prompt,
92
+ nag_negative_prompt=nag_negative_prompt,
93
+ nag_scale=nag_scale,
94
+ )
95
+ return outputs, 3.5, 25, seed, True
96
+
97
+
98
+ css = '''
99
+ .gradio-container{
100
+ max-width: 768px !important;
101
+ margin: 0 auto;
102
+ }
103
+ '''
104
+
105
+ with gr.Blocks(css=css, theme=theme) as demo:
106
+ gr.Markdown('''# Normalized Attention Guidance (NAG) Flux-Dev
107
+ Implementation of [Normalized Attention Guidance](https://chendaryen.github.io/NAG.github.io/)
108
+ ''')
109
+ with gr.Group():
110
+ prompt = gr.Textbox(
111
+ label="Prompt",
112
+ max_lines=1,
113
+ placeholder="Enter your prompt",
114
+ )
115
+ nag_negative_prompt = gr.Textbox(
116
+ label="Negative Prompt for NAG",
117
+ value="Low resolution, blurry, lack of details, illustration, cartoon, painting.",
118
+ max_lines=1,
119
+ )
120
+ nag_scale = gr.Slider(label="NAG Scale", minimum=1., maximum=20., step=0.25, value=5.)
121
+ compare = gr.Checkbox(label="Compare with baseline", info="If unchecked, only sample with NAG will be generated.", value=True)
122
+ button = gr.Button("Generate", min_width=120)
123
+ output = gr.ImageSlider(label="Left: Baseline, Right: With NAG", interactive=False)
124
+ with gr.Accordion("Advanced Settings", open=False):
125
+ negative_prompt = gr.Textbox(label="Negative Prompt", value=None, visible=False)
126
+ guidance_scale = gr.Slider(label="Guidance Scale", minimum=1., maximum=15., step=0.1, value=3.5)
127
+ num_inference_steps = gr.Slider(label="Inference Steps", minimum=1, maximum=50, step=1, value=25)
128
+ seed = gr.Slider(label="Seed", minimum=1, maximum=9007199254740991, step=1, randomize=True)
129
+ randomize_seed = gr.Checkbox(label="Randomize seed", value=True)
130
+
131
+ gr.Examples(
132
+ examples=examples,
133
+ fn=sample_example,
134
+ inputs=[
135
+ prompt,
136
+ nag_negative_prompt,
137
+ nag_scale,
138
+ ],
139
+ outputs=[output, guidance_scale, num_inference_steps, seed, compare],
140
+ cache_examples="lazy",
141
+ )
142
+
143
+ gr.on(
144
+ triggers=[
145
+ button.click,
146
+ prompt.submit
147
+ ],
148
+ fn=sample,
149
+ inputs=[
150
+ prompt,
151
+ negative_prompt, guidance_scale,
152
+ nag_negative_prompt, nag_scale,
153
+ num_inference_steps,
154
+ seed, randomize_seed,
155
+ compare,
156
+ ],
157
+ outputs=[output, seed],
158
+ )
159
+ if __name__ == "__main__":
160
+ huggingface_hub.login(os.getenv('HF_TOKEN'))
161
+ demo.launch(share=True)
requirements.txt ADDED
@@ -0,0 +1,5 @@
 
 
 
 
 
 
1
+ accelerate
2
+ diffusers
3
+ torch
4
+ transformers
5
+ sentencepiece
src/__init__.py ADDED
File without changes
src/attention_flux_nag.py ADDED
@@ -0,0 +1,176 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import math
2
+ from typing import Optional
3
+
4
+ import torch
5
+ import torch.nn.functional as F
6
+
7
+ from diffusers.models.attention_processor import Attention
8
+ from diffusers.models.embeddings import apply_rotary_emb
9
+
10
+
11
+ class NAGFluxAttnProcessor2_0:
12
+ """Attention processor used typically in processing the SD3-like self-attention projections."""
13
+
14
+ def __init__(
15
+ self,
16
+ nag_scale: float = 1.0,
17
+ nag_tau=2.5,
18
+ nag_alpha=0.25,
19
+ encoder_hidden_states_length: int = None,
20
+ ):
21
+ if not hasattr(F, "scaled_dot_product_attention"):
22
+ raise ImportError("FluxAttnProcessor2_0 requires PyTorch 2.0, to use it, please upgrade PyTorch to 2.0.")
23
+ self.nag_scale = nag_scale
24
+ self.nag_tau = nag_tau
25
+ self.nag_alpha = nag_alpha
26
+ self.encoder_hidden_states_length = encoder_hidden_states_length
27
+
28
+ def __call__(
29
+ self,
30
+ attn: Attention,
31
+ hidden_states: torch.FloatTensor,
32
+ encoder_hidden_states: torch.FloatTensor = None,
33
+ attention_mask: Optional[torch.FloatTensor] = None,
34
+ image_rotary_emb: Optional[torch.Tensor] = None,
35
+ ) -> torch.FloatTensor:
36
+ batch_size, _, _ = hidden_states.shape if encoder_hidden_states is None else encoder_hidden_states.shape
37
+
38
+ if self.nag_scale > 1.:
39
+ if encoder_hidden_states is not None:
40
+ assert len(hidden_states) == batch_size * 0.5
41
+ apply_guidance = True
42
+ else:
43
+ apply_guidance = False
44
+
45
+ # `sample` projections.
46
+ query = attn.to_q(hidden_states)
47
+ key = attn.to_k(hidden_states)
48
+ value = attn.to_v(hidden_states)
49
+
50
+ # attention
51
+ if apply_guidance and encoder_hidden_states is not None:
52
+ query = query.tile(2, 1, 1)
53
+ key = key.tile(2, 1, 1)
54
+ value = value.tile(2, 1, 1)
55
+
56
+ inner_dim = key.shape[-1]
57
+ head_dim = inner_dim // attn.heads
58
+
59
+ query = query.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2)
60
+ key = key.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2)
61
+ value = value.view(batch_size, -1, attn.heads, head_dim).transpose(1, 2)
62
+
63
+ if attn.norm_q is not None:
64
+ query = attn.norm_q(query)
65
+ if attn.norm_k is not None:
66
+ key = attn.norm_k(key)
67
+
68
+ # the attention in FluxSingleTransformerBlock does not use `encoder_hidden_states`
69
+ if encoder_hidden_states is not None:
70
+ # `context` projections.
71
+ encoder_hidden_states_query_proj = attn.add_q_proj(encoder_hidden_states)
72
+ encoder_hidden_states_key_proj = attn.add_k_proj(encoder_hidden_states)
73
+ encoder_hidden_states_value_proj = attn.add_v_proj(encoder_hidden_states)
74
+
75
+ encoder_hidden_states_query_proj = encoder_hidden_states_query_proj.view(
76
+ batch_size, -1, attn.heads, head_dim
77
+ ).transpose(1, 2)
78
+ encoder_hidden_states_key_proj = encoder_hidden_states_key_proj.view(
79
+ batch_size, -1, attn.heads, head_dim
80
+ ).transpose(1, 2)
81
+ encoder_hidden_states_value_proj = encoder_hidden_states_value_proj.view(
82
+ batch_size, -1, attn.heads, head_dim
83
+ ).transpose(1, 2)
84
+
85
+ if attn.norm_added_q is not None:
86
+ encoder_hidden_states_query_proj = attn.norm_added_q(encoder_hidden_states_query_proj)
87
+ if attn.norm_added_k is not None:
88
+ encoder_hidden_states_key_proj = attn.norm_added_k(encoder_hidden_states_key_proj)
89
+
90
+ query = torch.cat([encoder_hidden_states_query_proj, query], dim=2)
91
+ key = torch.cat([encoder_hidden_states_key_proj, key], dim=2)
92
+ value = torch.cat([encoder_hidden_states_value_proj, value], dim=2)
93
+
94
+ encoder_hidden_states_length = encoder_hidden_states.shape[1]
95
+
96
+ else:
97
+ assert self.encoder_hidden_states_length is not None
98
+ encoder_hidden_states_length = self.encoder_hidden_states_length
99
+
100
+ if image_rotary_emb is not None:
101
+ query = apply_rotary_emb(query, image_rotary_emb)
102
+ key = apply_rotary_emb(key, image_rotary_emb)
103
+
104
+ if not apply_guidance:
105
+ hidden_states = F.scaled_dot_product_attention(query, key, value, dropout_p=0.0, is_causal=False)
106
+ hidden_states = hidden_states.transpose(1, 2).reshape(batch_size, -1, attn.heads * head_dim)
107
+ hidden_states = hidden_states.to(query.dtype)
108
+
109
+ else:
110
+ origin_batch_size = batch_size // 2
111
+ query, query_negative = torch.chunk(query, 2, dim=0)
112
+ key, key_negative = torch.chunk(key, 2, dim=0)
113
+ value, value_negative = torch.chunk(value, 2, dim=0)
114
+
115
+ hidden_states_negative = F.scaled_dot_product_attention(query_negative, key_negative, value_negative, dropout_p=0.0, is_causal=False)
116
+ hidden_states_negative = hidden_states_negative.transpose(1, 2).reshape(origin_batch_size, -1, attn.heads * head_dim)
117
+ hidden_states_negative = hidden_states_negative.to(query.dtype)
118
+
119
+ hidden_states = F.scaled_dot_product_attention(query, key, value, dropout_p=0.0, is_causal=False)
120
+ hidden_states = hidden_states.transpose(1, 2).reshape(origin_batch_size, -1, attn.heads * head_dim)
121
+ hidden_states = hidden_states.to(query.dtype)
122
+
123
+ if encoder_hidden_states is not None:
124
+ encoder_hidden_states, hidden_states = (
125
+ hidden_states[:, : encoder_hidden_states.shape[1]],
126
+ hidden_states[:, encoder_hidden_states.shape[1] :],
127
+ )
128
+
129
+ if apply_guidance:
130
+ encoder_hidden_states_negative, hidden_states_negative = (
131
+ hidden_states_negative[:, : encoder_hidden_states.shape[1]],
132
+ hidden_states_negative[:, encoder_hidden_states.shape[1]:],
133
+ )
134
+ hidden_states_positive = hidden_states
135
+ hidden_states_guidance = hidden_states_positive * self.nag_scale - hidden_states_negative * (self.nag_scale - 1)
136
+ norm_positive = torch.norm(hidden_states_positive, p=2, dim=-1, keepdim=True).expand(*hidden_states_positive.shape)
137
+ norm_guidance = torch.norm(hidden_states_guidance, p=2, dim=-1, keepdim=True).expand(*hidden_states_positive.shape)
138
+
139
+ scale = norm_guidance / norm_positive
140
+ hidden_states_guidance = hidden_states_guidance * torch.minimum(scale, scale.new_ones(1) * self.nag_tau) / scale
141
+
142
+ hidden_states = hidden_states_guidance * self.nag_alpha + hidden_states_positive * (1 - self.nag_alpha)
143
+
144
+ encoder_hidden_states = torch.cat((encoder_hidden_states, encoder_hidden_states_negative), dim=0)
145
+
146
+ # linear proj
147
+ hidden_states = attn.to_out[0](hidden_states)
148
+ # dropout
149
+ hidden_states = attn.to_out[1](hidden_states)
150
+
151
+ encoder_hidden_states = attn.to_add_out(encoder_hidden_states)
152
+
153
+ return hidden_states, encoder_hidden_states
154
+
155
+ else:
156
+ if apply_guidance:
157
+ image_hidden_states_negative = hidden_states_negative[:, encoder_hidden_states_length:]
158
+ image_hidden_states = hidden_states[:, encoder_hidden_states_length:]
159
+
160
+ image_hidden_states_positive = image_hidden_states
161
+ image_hidden_states_guidance = image_hidden_states_positive * self.nag_scale - image_hidden_states_negative * (self.nag_scale - 1)
162
+ norm_positive = torch.norm(image_hidden_states_positive, p=2, dim=-1, keepdim=True).expand(*image_hidden_states_positive.shape)
163
+ norm_guidance = torch.norm(image_hidden_states_guidance, p=2, dim=-1, keepdim=True).expand(*image_hidden_states_positive.shape)
164
+
165
+ scale = norm_guidance / norm_positive
166
+ image_hidden_states_guidance = image_hidden_states_guidance * torch.minimum(scale, scale.new_ones(1) * self.nag_tau) / scale
167
+ # scale = torch.nan_to_num(scale, 10)
168
+ # image_hidden_states_guidance[scale > self.nag_tau] = image_hidden_states_guidance[scale > self.nag_tau] / (norm_guidance[scale > self.nag_tau] + 1e-7) * norm_positive[scale > self.nag_tau] * self.nag_tau
169
+
170
+ image_hidden_states = image_hidden_states_guidance * self.nag_alpha + image_hidden_states_positive * (1 - self.nag_alpha)
171
+
172
+ hidden_states_negative[:, encoder_hidden_states_length:] = image_hidden_states
173
+ hidden_states[:, encoder_hidden_states_length:] = image_hidden_states
174
+ hidden_states = torch.cat((hidden_states, hidden_states_negative), dim=0)
175
+
176
+ return hidden_states
src/normalization.py ADDED
@@ -0,0 +1,54 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Optional, Tuple
2
+
3
+ import torch
4
+ from diffusers.models.normalization import AdaLayerNorm, AdaLayerNormContinuous, AdaLayerNormZero, SD35AdaLayerNormZeroX
5
+
6
+
7
+ class TruncAdaLayerNorm(AdaLayerNorm):
8
+ def forward(
9
+ self, x: torch.Tensor, timestep: Optional[torch.Tensor] = None, temb: Optional[torch.Tensor] = None
10
+ ) -> torch.Tensor:
11
+ batch_size = x.shape[0]
12
+ return self.forward_old(
13
+ x,
14
+ temb[:batch_size] if temb is not None else None,
15
+ )
16
+
17
+
18
+ class TruncAdaLayerNormContinuous(AdaLayerNormContinuous):
19
+ def forward(self, x: torch.Tensor, conditioning_embedding: torch.Tensor) -> torch.Tensor:
20
+ batch_size = x.shape[0]
21
+ return self.forward_old(x, conditioning_embedding[:batch_size])
22
+
23
+
24
+ class TruncAdaLayerNormZero(AdaLayerNormZero):
25
+ def forward(
26
+ self,
27
+ x: torch.Tensor,
28
+ timestep: Optional[torch.Tensor] = None,
29
+ class_labels: Optional[torch.LongTensor] = None,
30
+ hidden_dtype: Optional[torch.dtype] = None,
31
+ emb: Optional[torch.Tensor] = None,
32
+ ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
33
+ batch_size = x.shape[0]
34
+ return self.forward_old(
35
+ x,
36
+ timestep[:batch_size] if timestep is not None else None,
37
+ class_labels[:batch_size] if class_labels is not None else None,
38
+ hidden_dtype,
39
+ emb[:batch_size] if emb is not None else None,
40
+ )
41
+
42
+
43
+ class TruncSD35AdaLayerNormZeroX(SD35AdaLayerNormZeroX):
44
+ def forward(
45
+ self,
46
+ hidden_states: torch.Tensor,
47
+ emb: Optional[torch.Tensor] = None,
48
+ ) -> Tuple[torch.Tensor, ...]:
49
+ batch_size = hidden_states.shape[0]
50
+ return self.forward_old(
51
+ hidden_states,
52
+ emb[:batch_size] if emb is not None else None,
53
+ )
54
+
src/pipeline_flux_nag.py ADDED
@@ -0,0 +1,461 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import math
2
+ from typing import Union, List, Optional, Dict, Any, Callable
3
+ import types
4
+
5
+ import numpy as np
6
+ import torch
7
+
8
+ from diffusers import FluxPipeline
9
+ from diffusers.pipelines.flux.pipeline_flux import calculate_shift, retrieve_timesteps
10
+ from diffusers.image_processor import PipelineImageInput
11
+ from diffusers.utils import is_torch_xla_available
12
+ from diffusers.pipelines.flux.pipeline_output import FluxPipelineOutput
13
+ from diffusers.models.normalization import AdaLayerNormZero, AdaLayerNormContinuous
14
+
15
+ from src.attention_flux_nag import NAGFluxAttnProcessor2_0
16
+ from src.normalization import TruncAdaLayerNormZero, TruncAdaLayerNormContinuous
17
+
18
+ if is_torch_xla_available():
19
+ import torch_xla.core.xla_model as xm
20
+
21
+ XLA_AVAILABLE = True
22
+ else:
23
+ XLA_AVAILABLE = False
24
+
25
+
26
+ class NAGFluxPipeline(FluxPipeline):
27
+ @property
28
+ def do_normalized_attention_guidance(self):
29
+ return self._nag_scale > 1
30
+
31
+ def _set_nag_attn_processor(
32
+ self,
33
+ nag_scale,
34
+ encoder_hidden_states_length,
35
+ nag_tau=2.5,
36
+ nag_alpha=0.25,
37
+ ):
38
+ attn_procs = {}
39
+ for name in self.transformer.attn_processors.keys():
40
+ attn_procs[name] = NAGFluxAttnProcessor2_0(
41
+ nag_scale=nag_scale,
42
+ nag_tau=nag_tau,
43
+ nag_alpha=nag_alpha,
44
+ encoder_hidden_states_length=encoder_hidden_states_length,
45
+ )
46
+ self.transformer.set_attn_processor(attn_procs)
47
+
48
+ @torch.no_grad()
49
+ def __call__(
50
+ self,
51
+ prompt: Union[str, List[str]] = None,
52
+ prompt_2: Optional[Union[str, List[str]]] = None,
53
+ negative_prompt: Union[str, List[str]] = None,
54
+ negative_prompt_2: Optional[Union[str, List[str]]] = None,
55
+ true_cfg_scale: float = 1.0,
56
+ height: Optional[int] = None,
57
+ width: Optional[int] = None,
58
+ num_inference_steps: int = 28,
59
+ sigmas: Optional[List[float]] = None,
60
+ guidance_scale: float = 3.5,
61
+ num_images_per_prompt: Optional[int] = 1,
62
+ generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
63
+ latents: Optional[torch.FloatTensor] = None,
64
+ prompt_embeds: Optional[torch.FloatTensor] = None,
65
+ pooled_prompt_embeds: Optional[torch.FloatTensor] = None,
66
+ ip_adapter_image: Optional[PipelineImageInput] = None,
67
+ ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None,
68
+ negative_ip_adapter_image: Optional[PipelineImageInput] = None,
69
+ negative_ip_adapter_image_embeds: Optional[List[torch.Tensor]] = None,
70
+ negative_prompt_embeds: Optional[torch.FloatTensor] = None,
71
+ negative_pooled_prompt_embeds: Optional[torch.FloatTensor] = None,
72
+ output_type: Optional[str] = "pil",
73
+ return_dict: bool = True,
74
+ joint_attention_kwargs: Optional[Dict[str, Any]] = None,
75
+ callback_on_step_end: Optional[Callable[[int, int, Dict], None]] = None,
76
+ callback_on_step_end_tensor_inputs: List[str] = ["latents"],
77
+ max_sequence_length: int = 512,
78
+
79
+ nag_scale: float = 1.0,
80
+ nag_tau: float = 2.5,
81
+ nag_alpha: float = 0.25,
82
+ nag_end: float = 1.0,
83
+ nag_negative_prompt: str = None,
84
+ nag_negative_prompt_2: str = None,
85
+ nag_negative_prompt_embeds: Optional[torch.Tensor] = None,
86
+ nag_negative_pooled_prompt_embeds: Optional[torch.Tensor] = None,
87
+ ):
88
+ r"""
89
+ Function invoked when calling the pipeline for generation.
90
+
91
+ Args:
92
+ prompt (`str` or `List[str]`, *optional*):
93
+ The prompt or prompts to guide the image generation. If not defined, one has to pass `prompt_embeds`.
94
+ instead.
95
+ prompt_2 (`str` or `List[str]`, *optional*):
96
+ The prompt or prompts to be sent to `tokenizer_2` and `text_encoder_2`. If not defined, `prompt` is
97
+ will be used instead.
98
+ negative_prompt (`str` or `List[str]`, *optional*):
99
+ The prompt or prompts not to guide the image generation. If not defined, one has to pass
100
+ `negative_prompt_embeds` instead. Ignored when not using guidance (i.e., ignored if `true_cfg_scale` is
101
+ not greater than `1`).
102
+ negative_prompt_2 (`str` or `List[str]`, *optional*):
103
+ The prompt or prompts not to guide the image generation to be sent to `tokenizer_2` and
104
+ `text_encoder_2`. If not defined, `negative_prompt` is used in all the text-encoders.
105
+ true_cfg_scale (`float`, *optional*, defaults to 1.0):
106
+ When > 1.0 and a provided `negative_prompt`, enables true classifier-free guidance.
107
+ height (`int`, *optional*, defaults to self.unet.config.sample_size * self.vae_scale_factor):
108
+ The height in pixels of the generated image. This is set to 1024 by default for the best results.
109
+ width (`int`, *optional*, defaults to self.unet.config.sample_size * self.vae_scale_factor):
110
+ The width in pixels of the generated image. This is set to 1024 by default for the best results.
111
+ num_inference_steps (`int`, *optional*, defaults to 50):
112
+ The number of denoising steps. More denoising steps usually lead to a higher quality image at the
113
+ expense of slower inference.
114
+ sigmas (`List[float]`, *optional*):
115
+ Custom sigmas to use for the denoising process with schedulers which support a `sigmas` argument in
116
+ their `set_timesteps` method. If not defined, the default behavior when `num_inference_steps` is passed
117
+ will be used.
118
+ guidance_scale (`float`, *optional*, defaults to 7.0):
119
+ Guidance scale as defined in [Classifier-Free Diffusion Guidance](https://arxiv.org/abs/2207.12598).
120
+ `guidance_scale` is defined as `w` of equation 2. of [Imagen
121
+ Paper](https://arxiv.org/pdf/2205.11487.pdf). Guidance scale is enabled by setting `guidance_scale >
122
+ 1`. Higher guidance scale encourages to generate images that are closely linked to the text `prompt`,
123
+ usually at the expense of lower image quality.
124
+ num_images_per_prompt (`int`, *optional*, defaults to 1):
125
+ The number of images to generate per prompt.
126
+ generator (`torch.Generator` or `List[torch.Generator]`, *optional*):
127
+ One or a list of [torch generator(s)](https://pytorch.org/docs/stable/generated/torch.Generator.html)
128
+ to make generation deterministic.
129
+ latents (`torch.FloatTensor`, *optional*):
130
+ Pre-generated noisy latents, sampled from a Gaussian distribution, to be used as inputs for image
131
+ generation. Can be used to tweak the same generation with different prompts. If not provided, a latents
132
+ tensor will ge generated by sampling using the supplied random `generator`.
133
+ prompt_embeds (`torch.FloatTensor`, *optional*):
134
+ Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If not
135
+ provided, text embeddings will be generated from `prompt` input argument.
136
+ pooled_prompt_embeds (`torch.FloatTensor`, *optional*):
137
+ Pre-generated pooled text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting.
138
+ If not provided, pooled text embeddings will be generated from `prompt` input argument.
139
+ ip_adapter_image: (`PipelineImageInput`, *optional*): Optional image input to work with IP Adapters.
140
+ ip_adapter_image_embeds (`List[torch.Tensor]`, *optional*):
141
+ Pre-generated image embeddings for IP-Adapter. It should be a list of length same as number of
142
+ IP-adapters. Each element should be a tensor of shape `(batch_size, num_images, emb_dim)`. If not
143
+ provided, embeddings are computed from the `ip_adapter_image` input argument.
144
+ negative_ip_adapter_image:
145
+ (`PipelineImageInput`, *optional*): Optional image input to work with IP Adapters.
146
+ negative_ip_adapter_image_embeds (`List[torch.Tensor]`, *optional*):
147
+ Pre-generated image embeddings for IP-Adapter. It should be a list of length same as number of
148
+ IP-adapters. Each element should be a tensor of shape `(batch_size, num_images, emb_dim)`. If not
149
+ provided, embeddings are computed from the `ip_adapter_image` input argument.
150
+ negative_prompt_embeds (`torch.FloatTensor`, *optional*):
151
+ Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt
152
+ weighting. If not provided, negative_prompt_embeds will be generated from `negative_prompt` input
153
+ argument.
154
+ negative_pooled_prompt_embeds (`torch.FloatTensor`, *optional*):
155
+ Pre-generated negative pooled text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt
156
+ weighting. If not provided, pooled negative_prompt_embeds will be generated from `negative_prompt`
157
+ input argument.
158
+ output_type (`str`, *optional*, defaults to `"pil"`):
159
+ The output format of the generate image. Choose between
160
+ [PIL](https://pillow.readthedocs.io/en/stable/): `PIL.Image.Image` or `np.array`.
161
+ return_dict (`bool`, *optional*, defaults to `True`):
162
+ Whether or not to return a [`~pipelines.flux.FluxPipelineOutput`] instead of a plain tuple.
163
+ joint_attention_kwargs (`dict`, *optional*):
164
+ A kwargs dictionary that if specified is passed along to the `AttentionProcessor` as defined under
165
+ `self.processor` in
166
+ [diffusers.models.attention_processor](https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/attention_processor.py).
167
+ callback_on_step_end (`Callable`, *optional*):
168
+ A function that calls at the end of each denoising steps during the inference. The function is called
169
+ with the following arguments: `callback_on_step_end(self: DiffusionPipeline, step: int, timestep: int,
170
+ callback_kwargs: Dict)`. `callback_kwargs` will include a list of all tensors as specified by
171
+ `callback_on_step_end_tensor_inputs`.
172
+ callback_on_step_end_tensor_inputs (`List`, *optional*):
173
+ The list of tensor inputs for the `callback_on_step_end` function. The tensors specified in the list
174
+ will be passed as `callback_kwargs` argument. You will only be able to include variables listed in the
175
+ `._callback_tensor_inputs` attribute of your pipeline class.
176
+ max_sequence_length (`int` defaults to 512): Maximum sequence length to use with the `prompt`.
177
+
178
+ Examples:
179
+
180
+ Returns:
181
+ [`~pipelines.flux.FluxPipelineOutput`] or `tuple`: [`~pipelines.flux.FluxPipelineOutput`] if `return_dict`
182
+ is True, otherwise a `tuple`. When returning a tuple, the first element is a list with the generated
183
+ images.
184
+ """
185
+
186
+ height = height or self.default_sample_size * self.vae_scale_factor
187
+ width = width or self.default_sample_size * self.vae_scale_factor
188
+
189
+ # 1. Check inputs. Raise error if not correct
190
+ self.check_inputs(
191
+ prompt,
192
+ prompt_2,
193
+ height,
194
+ width,
195
+ negative_prompt=negative_prompt,
196
+ negative_prompt_2=negative_prompt_2,
197
+ prompt_embeds=prompt_embeds,
198
+ negative_prompt_embeds=negative_prompt_embeds,
199
+ pooled_prompt_embeds=pooled_prompt_embeds,
200
+ negative_pooled_prompt_embeds=negative_pooled_prompt_embeds,
201
+ callback_on_step_end_tensor_inputs=callback_on_step_end_tensor_inputs,
202
+ max_sequence_length=max_sequence_length,
203
+ )
204
+
205
+ self._guidance_scale = guidance_scale
206
+ self._joint_attention_kwargs = joint_attention_kwargs
207
+ self._current_timestep = None
208
+ self._interrupt = False
209
+ self._nag_scale = nag_scale
210
+
211
+ # 2. Define call parameters
212
+ if prompt is not None and isinstance(prompt, str):
213
+ batch_size = 1
214
+ elif prompt is not None and isinstance(prompt, list):
215
+ batch_size = len(prompt)
216
+ else:
217
+ batch_size = prompt_embeds.shape[0]
218
+
219
+ device = self._execution_device
220
+
221
+ lora_scale = (
222
+ self.joint_attention_kwargs.get("scale", None) if self.joint_attention_kwargs is not None else None
223
+ )
224
+ has_neg_prompt = negative_prompt is not None or (
225
+ negative_prompt_embeds is not None and negative_pooled_prompt_embeds is not None
226
+ )
227
+ do_true_cfg = true_cfg_scale > 1 and has_neg_prompt
228
+ # do_true_cfg = do_true_cfg or self.do_normalized_attention_guidance
229
+
230
+ (
231
+ prompt_embeds,
232
+ pooled_prompt_embeds,
233
+ text_ids,
234
+ ) = self.encode_prompt(
235
+ prompt=prompt,
236
+ prompt_2=prompt_2,
237
+ prompt_embeds=prompt_embeds,
238
+ pooled_prompt_embeds=pooled_prompt_embeds,
239
+ device=device,
240
+ num_images_per_prompt=num_images_per_prompt,
241
+ max_sequence_length=max_sequence_length,
242
+ lora_scale=lora_scale,
243
+ )
244
+ if do_true_cfg:
245
+ (
246
+ negative_prompt_embeds,
247
+ negative_pooled_prompt_embeds,
248
+ _,
249
+ ) = self.encode_prompt(
250
+ prompt=negative_prompt,
251
+ prompt_2=negative_prompt_2,
252
+ prompt_embeds=negative_prompt_embeds,
253
+ pooled_prompt_embeds=negative_pooled_prompt_embeds,
254
+ device=device,
255
+ num_images_per_prompt=num_images_per_prompt,
256
+ max_sequence_length=max_sequence_length,
257
+ lora_scale=lora_scale,
258
+ )
259
+
260
+ if self.do_normalized_attention_guidance:
261
+ if nag_negative_prompt_embeds is None or nag_negative_pooled_prompt_embeds is None:
262
+ if nag_negative_prompt is None:
263
+ if negative_prompt is not None:
264
+ if do_true_cfg:
265
+ nag_negative_prompt_embeds = negative_prompt_embeds
266
+ nag_negative_pooled_prompt_embeds = negative_pooled_prompt_embeds
267
+ else:
268
+ nag_negative_prompt = negative_prompt
269
+ nag_negative_prompt_2 = negative_prompt_2
270
+ else:
271
+ nag_negative_prompt = ""
272
+
273
+ if nag_negative_prompt is not None:
274
+ nag_negative_prompt_embeds, nag_negative_pooled_prompt_embeds = self.encode_prompt(
275
+ prompt=nag_negative_prompt,
276
+ prompt_2=nag_negative_prompt_2,
277
+ device=device,
278
+ num_images_per_prompt=num_images_per_prompt,
279
+ max_sequence_length=max_sequence_length,
280
+ lora_scale=lora_scale,
281
+ )[:2]
282
+
283
+ if self.do_normalized_attention_guidance:
284
+ pooled_prompt_embeds = torch.cat([pooled_prompt_embeds, nag_negative_pooled_prompt_embeds], dim=0)
285
+ prompt_embeds = torch.cat([prompt_embeds, nag_negative_prompt_embeds], dim=0)
286
+
287
+ # 4. Prepare latent variables
288
+ num_channels_latents = self.transformer.config.in_channels // 4
289
+ latents, latent_image_ids = self.prepare_latents(
290
+ batch_size * num_images_per_prompt,
291
+ num_channels_latents,
292
+ height,
293
+ width,
294
+ prompt_embeds.dtype,
295
+ device,
296
+ generator,
297
+ latents,
298
+ )
299
+
300
+ # 5. Prepare timesteps
301
+ sigmas = np.linspace(1.0, 1 / num_inference_steps, num_inference_steps) if sigmas is None else sigmas
302
+ image_seq_len = latents.shape[1]
303
+ mu = calculate_shift(
304
+ image_seq_len,
305
+ self.scheduler.config.get("base_image_seq_len", 256),
306
+ self.scheduler.config.get("max_image_seq_len", 4096),
307
+ self.scheduler.config.get("base_shift", 0.5),
308
+ self.scheduler.config.get("max_shift", 1.16),
309
+ )
310
+ timesteps, num_inference_steps = retrieve_timesteps(
311
+ self.scheduler,
312
+ num_inference_steps,
313
+ device,
314
+ sigmas=sigmas,
315
+ mu=mu,
316
+ )
317
+ num_warmup_steps = max(len(timesteps) - num_inference_steps * self.scheduler.order, 0)
318
+ self._num_timesteps = len(timesteps)
319
+
320
+ # handle guidance
321
+ if self.transformer.config.guidance_embeds:
322
+ guidance = torch.full([1], guidance_scale, device=device, dtype=torch.float32)
323
+ guidance = guidance.expand(prompt_embeds.shape[0])
324
+ else:
325
+ guidance = None
326
+
327
+ if (ip_adapter_image is not None or ip_adapter_image_embeds is not None) and (
328
+ negative_ip_adapter_image is None and negative_ip_adapter_image_embeds is None
329
+ ):
330
+ negative_ip_adapter_image = np.zeros((width, height, 3), dtype=np.uint8)
331
+ elif (ip_adapter_image is None and ip_adapter_image_embeds is None) and (
332
+ negative_ip_adapter_image is not None or negative_ip_adapter_image_embeds is not None
333
+ ):
334
+ ip_adapter_image = np.zeros((width, height, 3), dtype=np.uint8)
335
+
336
+ if self.joint_attention_kwargs is None:
337
+ self._joint_attention_kwargs = {}
338
+
339
+ image_embeds = None
340
+ negative_image_embeds = None
341
+ if ip_adapter_image is not None or ip_adapter_image_embeds is not None:
342
+ image_embeds = self.prepare_ip_adapter_image_embeds(
343
+ ip_adapter_image,
344
+ ip_adapter_image_embeds,
345
+ device,
346
+ batch_size * num_images_per_prompt,
347
+ )
348
+ if negative_ip_adapter_image is not None or negative_ip_adapter_image_embeds is not None:
349
+ negative_image_embeds = self.prepare_ip_adapter_image_embeds(
350
+ negative_ip_adapter_image,
351
+ negative_ip_adapter_image_embeds,
352
+ device,
353
+ batch_size * num_images_per_prompt,
354
+ )
355
+
356
+ origin_attn_procs = self.transformer.attn_processors
357
+ if self.do_normalized_attention_guidance:
358
+ self._set_nag_attn_processor(nag_scale, prompt_embeds.shape[1], nag_tau, nag_alpha)
359
+ attn_procs_recovered = False
360
+
361
+ for sub_mod in self.transformer.modules():
362
+ if not hasattr(sub_mod, "forward_old") :
363
+ sub_mod.forward_old = sub_mod.forward
364
+ if isinstance(sub_mod, AdaLayerNormZero):
365
+ sub_mod.forward = types.MethodType(TruncAdaLayerNormZero.forward, sub_mod)
366
+ elif isinstance(sub_mod, AdaLayerNormContinuous):
367
+ sub_mod.forward = types.MethodType(TruncAdaLayerNormContinuous.forward, sub_mod)
368
+
369
+ # 6. Denoising loop
370
+ with self.progress_bar(total=num_inference_steps) as progress_bar:
371
+ for i, t in enumerate(timesteps):
372
+ if self.interrupt:
373
+ continue
374
+
375
+ if t < (1 - nag_end) * 1000 and self.do_normalized_attention_guidance and not attn_procs_recovered:
376
+ self.transformer.set_attn_processor(origin_attn_procs)
377
+ if guidance is not None:
378
+ guidance = guidance[:len(latents)]
379
+ pooled_prompt_embeds = pooled_prompt_embeds[:len(latents)]
380
+ prompt_embeds = prompt_embeds[:len(latents)]
381
+ attn_procs_recovered = True
382
+
383
+ self._current_timestep = t
384
+ if image_embeds is not None:
385
+ self._joint_attention_kwargs["ip_adapter_image_embeds"] = image_embeds
386
+ # broadcast to batch dimension in a way that's compatible with ONNX/Core ML
387
+ timestep = t.expand(prompt_embeds.shape[0]).to(latents.dtype)
388
+
389
+ noise_pred = self.transformer(
390
+ hidden_states=latents,
391
+ timestep=timestep / 1000,
392
+ guidance=guidance,
393
+ pooled_projections=pooled_prompt_embeds,
394
+ encoder_hidden_states=prompt_embeds,
395
+ txt_ids=text_ids,
396
+ img_ids=latent_image_ids,
397
+ joint_attention_kwargs=self.joint_attention_kwargs,
398
+ return_dict=False,
399
+ )[0]
400
+
401
+ if do_true_cfg:
402
+ if negative_image_embeds is not None:
403
+ self._joint_attention_kwargs["ip_adapter_image_embeds"] = negative_image_embeds
404
+ neg_noise_pred = self.transformer(
405
+ hidden_states=latents,
406
+ timestep=timestep / 1000,
407
+ guidance=guidance,
408
+ pooled_projections=negative_pooled_prompt_embeds,
409
+ encoder_hidden_states=negative_prompt_embeds,
410
+ txt_ids=text_ids,
411
+ img_ids=latent_image_ids,
412
+ joint_attention_kwargs=self.joint_attention_kwargs,
413
+ return_dict=False,
414
+ )[0]
415
+ noise_pred = neg_noise_pred + true_cfg_scale * (noise_pred - neg_noise_pred)
416
+
417
+ # compute the previous noisy sample x_t -> x_t-1
418
+ latents_dtype = latents.dtype
419
+ latents = self.scheduler.step(noise_pred, t, latents, return_dict=False)[0]
420
+
421
+ if latents.dtype != latents_dtype:
422
+ if torch.backends.mps.is_available():
423
+ # some platforms (eg. apple mps) misbehave due to a pytorch bug: https://github.com/pytorch/pytorch/pull/99272
424
+ latents = latents.to(latents_dtype)
425
+
426
+ if callback_on_step_end is not None:
427
+ callback_kwargs = {}
428
+ for k in callback_on_step_end_tensor_inputs:
429
+ callback_kwargs[k] = locals()[k]
430
+ callback_outputs = callback_on_step_end(self, i, t, callback_kwargs)
431
+
432
+ latents = callback_outputs.pop("latents", latents)
433
+ prompt_embeds = callback_outputs.pop("prompt_embeds", prompt_embeds)
434
+
435
+ # call the callback, if provided
436
+ if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0):
437
+ progress_bar.update()
438
+
439
+ if XLA_AVAILABLE:
440
+ xm.mark_step()
441
+
442
+ self._current_timestep = None
443
+
444
+ if output_type == "latent":
445
+ image = latents
446
+ else:
447
+ latents = self._unpack_latents(latents, height, width, self.vae_scale_factor)
448
+ latents = (latents / self.vae.config.scaling_factor) + self.vae.config.shift_factor
449
+ image = self.vae.decode(latents, return_dict=False)[0]
450
+ image = self.image_processor.postprocess(image, output_type=output_type)
451
+
452
+ if self.do_normalized_attention_guidance and not attn_procs_recovered:
453
+ self.transformer.set_attn_processor(origin_attn_procs)
454
+
455
+ # Offload all models
456
+ self.maybe_free_model_hooks()
457
+
458
+ if not return_dict:
459
+ return (image,)
460
+
461
+ return FluxPipelineOutput(images=image)
src/transformer_flux.py ADDED
@@ -0,0 +1,187 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Any, Dict, Optional, Tuple, Union
2
+
3
+ import numpy as np
4
+ import torch
5
+
6
+ from diffusers.utils import USE_PEFT_BACKEND, logging, scale_lora_layers, unscale_lora_layers
7
+ from diffusers.models.modeling_outputs import Transformer2DModelOutput
8
+ from diffusers.models.transformers.transformer_flux import FluxTransformer2DModel
9
+
10
+
11
+ logger = logging.get_logger(__name__) # pylint: disable=invalid-name
12
+
13
+
14
+ class NAGFluxTransformer2DModel(FluxTransformer2DModel):
15
+ def forward(
16
+ self,
17
+ hidden_states: torch.Tensor,
18
+ encoder_hidden_states: torch.Tensor = None,
19
+ pooled_projections: torch.Tensor = None,
20
+ timestep: torch.LongTensor = None,
21
+ img_ids: torch.Tensor = None,
22
+ txt_ids: torch.Tensor = None,
23
+ guidance: torch.Tensor = None,
24
+ joint_attention_kwargs: Optional[Dict[str, Any]] = None,
25
+ controlnet_block_samples=None,
26
+ controlnet_single_block_samples=None,
27
+ return_dict: bool = True,
28
+ controlnet_blocks_repeat: bool = False,
29
+ ) -> Union[torch.Tensor, Transformer2DModelOutput]:
30
+ """
31
+ The [`FluxTransformer2DModel`] forward method.
32
+
33
+ Args:
34
+ hidden_states (`torch.Tensor` of shape `(batch_size, image_sequence_length, in_channels)`):
35
+ Input `hidden_states`.
36
+ encoder_hidden_states (`torch.Tensor` of shape `(batch_size, text_sequence_length, joint_attention_dim)`):
37
+ Conditional embeddings (embeddings computed from the input conditions such as prompts) to use.
38
+ pooled_projections (`torch.Tensor` of shape `(batch_size, projection_dim)`): Embeddings projected
39
+ from the embeddings of input conditions.
40
+ timestep ( `torch.LongTensor`):
41
+ Used to indicate denoising step.
42
+ block_controlnet_hidden_states: (`list` of `torch.Tensor`):
43
+ A list of tensors that if specified are added to the residuals of transformer blocks.
44
+ joint_attention_kwargs (`dict`, *optional*):
45
+ A kwargs dictionary that if specified is passed along to the `AttentionProcessor` as defined under
46
+ `self.processor` in
47
+ [diffusers.models.attention_processor](https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/attention_processor.py).
48
+ return_dict (`bool`, *optional*, defaults to `True`):
49
+ Whether or not to return a [`~models.transformer_2d.Transformer2DModelOutput`] instead of a plain
50
+ tuple.
51
+
52
+ Returns:
53
+ If `return_dict` is True, an [`~models.transformer_2d.Transformer2DModelOutput`] is returned, otherwise a
54
+ `tuple` where the first element is the sample tensor.
55
+ """
56
+ if joint_attention_kwargs is not None:
57
+ joint_attention_kwargs = joint_attention_kwargs.copy()
58
+ lora_scale = joint_attention_kwargs.pop("scale", 1.0)
59
+ else:
60
+ lora_scale = 1.0
61
+
62
+ if USE_PEFT_BACKEND:
63
+ # weight the lora layers by setting `lora_scale` for each PEFT layer
64
+ scale_lora_layers(self, lora_scale)
65
+ else:
66
+ if joint_attention_kwargs is not None and joint_attention_kwargs.get("scale", None) is not None:
67
+ logger.warning(
68
+ "Passing `scale` via `joint_attention_kwargs` when not using the PEFT backend is ineffective."
69
+ )
70
+
71
+ do_nag = hidden_states.shape[0] != encoder_hidden_states.shape[0]
72
+
73
+ hidden_states = self.x_embedder(hidden_states)
74
+
75
+ timestep = timestep.to(hidden_states.dtype) * 1000
76
+ if guidance is not None:
77
+ guidance = guidance.to(hidden_states.dtype) * 1000
78
+ else:
79
+ guidance = None
80
+
81
+ temb = (
82
+ self.time_text_embed(timestep, pooled_projections)
83
+ if guidance is None
84
+ else self.time_text_embed(timestep, guidance, pooled_projections)
85
+ )
86
+ encoder_hidden_states = self.context_embedder(encoder_hidden_states)
87
+
88
+ if txt_ids.ndim == 3:
89
+ logger.warning(
90
+ "Passing `txt_ids` 3d torch.Tensor is deprecated."
91
+ "Please remove the batch dimension and pass it as a 2d torch Tensor"
92
+ )
93
+ txt_ids = txt_ids[0]
94
+ if img_ids.ndim == 3:
95
+ logger.warning(
96
+ "Passing `img_ids` 3d torch.Tensor is deprecated."
97
+ "Please remove the batch dimension and pass it as a 2d torch Tensor"
98
+ )
99
+ img_ids = img_ids[0]
100
+
101
+ ids = torch.cat((txt_ids, img_ids), dim=0)
102
+ image_rotary_emb = self.pos_embed(ids)
103
+
104
+ if joint_attention_kwargs is not None and "ip_adapter_image_embeds" in joint_attention_kwargs:
105
+ ip_adapter_image_embeds = joint_attention_kwargs.pop("ip_adapter_image_embeds")
106
+ ip_hidden_states = self.encoder_hid_proj(ip_adapter_image_embeds)
107
+ joint_attention_kwargs.update({"ip_hidden_states": ip_hidden_states})
108
+
109
+ for index_block, block in enumerate(self.transformer_blocks):
110
+ if torch.is_grad_enabled() and self.gradient_checkpointing:
111
+ encoder_hidden_states, hidden_states = self._gradient_checkpointing_func(
112
+ block,
113
+ hidden_states,
114
+ encoder_hidden_states,
115
+ temb,
116
+ image_rotary_emb,
117
+ )
118
+
119
+ else:
120
+ encoder_hidden_states, hidden_states = block(
121
+ hidden_states=hidden_states,
122
+ encoder_hidden_states=encoder_hidden_states,
123
+ temb=temb,
124
+ image_rotary_emb=image_rotary_emb,
125
+ joint_attention_kwargs=joint_attention_kwargs,
126
+ )
127
+
128
+ # controlnet residual
129
+ if controlnet_block_samples is not None:
130
+ interval_control = len(self.transformer_blocks) / len(controlnet_block_samples)
131
+ interval_control = int(np.ceil(interval_control))
132
+ # For Xlabs ControlNet.
133
+ if controlnet_blocks_repeat:
134
+ hidden_states = (
135
+ hidden_states + controlnet_block_samples[index_block % len(controlnet_block_samples)]
136
+ )
137
+ else:
138
+ hidden_states = hidden_states + controlnet_block_samples[index_block // interval_control]
139
+
140
+ if do_nag:
141
+ hidden_states = hidden_states.tile(2, 1, 1)
142
+ hidden_states = torch.cat([encoder_hidden_states, hidden_states], dim=1)
143
+
144
+ for index_block, block in enumerate(self.single_transformer_blocks):
145
+ if torch.is_grad_enabled() and self.gradient_checkpointing:
146
+ hidden_states = self._gradient_checkpointing_func(
147
+ block,
148
+ hidden_states,
149
+ temb,
150
+ image_rotary_emb,
151
+ )
152
+
153
+ else:
154
+ hidden_states = block(
155
+ hidden_states=hidden_states,
156
+ temb=temb,
157
+ image_rotary_emb=image_rotary_emb,
158
+ joint_attention_kwargs=joint_attention_kwargs,
159
+ )
160
+
161
+ # controlnet residual
162
+ if controlnet_single_block_samples is not None:
163
+ interval_control = len(self.single_transformer_blocks) / len(controlnet_single_block_samples)
164
+ interval_control = int(np.ceil(interval_control))
165
+ controlnet_single_block_sample = controlnet_single_block_samples[index_block // interval_control]
166
+ if do_nag:
167
+ controlnet_single_block_sample = controlnet_single_block_sample.tile(2, 1, 1)
168
+ hidden_states[:, encoder_hidden_states.shape[1] :, ...] = (
169
+ hidden_states[:, encoder_hidden_states.shape[1] :, ...] + controlnet_single_block_sample
170
+ )
171
+
172
+ hidden_states = hidden_states[:, encoder_hidden_states.shape[1] :, ...]
173
+
174
+ if do_nag:
175
+ hidden_states = torch.chunk(hidden_states, 2, dim=0)[0]
176
+
177
+ hidden_states = self.norm_out(hidden_states, temb)
178
+ output = self.proj_out(hidden_states)
179
+
180
+ if USE_PEFT_BACKEND:
181
+ # remove `lora_scale` from each PEFT layer
182
+ unscale_lora_layers(self, lora_scale)
183
+
184
+ if not return_dict:
185
+ return (output,)
186
+
187
+ return Transformer2DModelOutput(sample=output)