hamishivi's picture
commit
17ff0d8 verified
raw
history blame
4.96 kB
import torch
from sdlm.models.roberta.modeling_roberta import RobertaForDiffusionLM
# Roberta with the CDF timestep warper.
class GARDiffusionLM(RobertaForDiffusionLM):
def __init__(self, config):
super().__init__(config)
# if true, use my gar warp.
self.use_gar_warp = True
self.gar_aggression = 0.5 # [0, 1] range. The higher, the more aggressive the warping (i.e. earlier tokens sent to 0 faster)
def warp_timesteps(
self,
timesteps: torch.FloatTensor,
token_input=None,
span_mask=None,
t_min=0,
t_max=1,
):
if self.use_gar_warp:
return self.gar_warp_timesteps(
timesteps, token_input, span_mask, t_min, t_max
)
else:
return self.ar_warp_timesteps(
timesteps, token_input, span_mask, t_min, t_max
)
def gar_warp_timesteps(
self,
timesteps: torch.FloatTensor,
token_input=None,
span_mask=None,
t_min=0,
t_max=1,
):
# Ensure timesteps is a floating point tensor for computations
timesteps = timesteps.float()
# Calculate token masks, excluding specific tokens (masking out padding and special tokens)
token_masks = ~span_mask
# Create a tensor representing each position in the sequence [0, 1, ..., seq_len-1]
seq_len = token_input.size(1)
positions = torch.arange(seq_len, device=token_input.device).float()
# Calculate the difference between positions to create a matrix of relative distances
# Shape of distances: [batch_size, seq_len, seq_len]
distances = (
positions.unsqueeze(0).unsqueeze(2) - positions.unsqueeze(0).unsqueeze(1)
).abs()
# Apply token masks to the distances, setting distances for non-masked tokens to 0
masked_distances = distances * token_masks.unsqueeze(1).float()
# set non-masked token distances to inf so they don't affect the min operation
masked_distances = (masked_distances.shape[-1] + 5) * (
1 - token_masks.unsqueeze(1).float()
) + masked_distances
# Rather than sum, we take the min of the distances (i.e., min distance to a nonmasked token)
composed = masked_distances.min(dim=2).values
# set padding tokens to 1, since we dont want these to affect the warping
# composed = torch.where(
# token_input == 1, torch.tensor(1.0, device=token_input.device), composed
# )
# normalize to make sure everything is consistent for different lengths.
composed_max, _ = composed.max(dim=1, keepdim=True)
composed_normalized = (
composed / composed_max
) # Now composed_normalized is in range [0, 1]
composed_normalized = (
1 - composed_normalized
) # Invert the composed_normalized values
composed_normalized = (
composed_normalized * self.gar_aggression
) # Scale the values to range [0, gar_aggression]
# Adjust timesteps based on composed_normalized values
# Ensure the operation is broadcastable: [batch_size, 1] * [batch_size, seq_len]
slope = -t_max / torch.clip(t_max * composed_normalized - t_max, max=1e-8)
adjusted_timesteps = slope * (timesteps - t_max) + t_max
adjusted_timesteps = torch.clip(adjusted_timesteps, min=t_min, max=t_max)
return adjusted_timesteps.long()
# warp following AR-diffusion paper
def ar_warp_timesteps(
self,
timesteps: torch.FloatTensor,
token_input=None,
span_mask=None,
t_min=0,
t_max=1,
):
N = 512
T = t_max
ne = 2 * N
te = T
# Ensure timesteps is a floating point tensor for computations
timesteps = timesteps.float()
# rescale timesteps to 0, 1
timesteps = (timesteps - t_min) / (t_max - t_min)
# scale up to 0, N+T (for ar-diffusion)
timesteps = timesteps * (N + T)
# Create a tensor representing each position in the sequence [0, 1, ..., seq_len-1]
seq_len = token_input.size(1)
positions = torch.arange(seq_len, device=token_input.device).float().view(1, -1)
# based on the span mask, only consider positions we generate for
input_ends = (1 - span_mask.long()).sum(-1)
positions = torch.clip(positions - input_ends.view(-1, 1), min=0)
# calculatute the starting points
ns = torch.clip(N - timesteps, 0, N)
ts = torch.clip(timesteps - N, 0, T)
adjusted_timesteps = torch.clip(
((te - ts) / (ne - ns)) * (positions - ns) + ts, 0, T
)
# it has been implicitly rescaled to 0, T, so we are done!
return adjusted_timesteps.long()
# no overriding the forward function, since the warper is deterministic and isn't trained.