pierrefdz's picture
inintal commit
8e6cbe9
raw
history blame
9.23 kB
import numpy as np
from scipy import special
import torch
from transformers import AutoTokenizer
from .hashing import get_seed_rng
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class WmDetector():
def __init__(self,
tokenizer: AutoTokenizer,
ngram: int = 1,
seed: int = 0
):
# model config
self.tokenizer = tokenizer
self.vocab_size = self.tokenizer.vocab_size
# watermark config
self.ngram = ngram
self.seed = seed
self.rng = torch.Generator()
self.rng.manual_seed(self.seed)
def aggregate_scores(
self,
scores: list[np.array],
aggregation: str = 'mean'
) -> float:
"""Aggregate scores along a text."""
if aggregation == 'sum':
return scores.sum(axis=0)
elif aggregation == 'mean':
return scores.mean(axis=0)
elif aggregation == 'max':
return scores.max(axis=0)
else:
raise ValueError(f'Aggregation {aggregation} not supported.')
def get_details(
self,
text: str,
scoring_method: str="v2",
ntoks_max: int = None,
) -> list[dict]:
"""
Get score increment for each token in text.
Args:
text: input text
scoring_method:
'none': score all ngrams
'v1': only score tokens for which wm window is unique
'v2': only score unique {wm window+tok} is unique
ntoks_max: maximum number of tokens
Output:
token_details: list of dicts containing token info and scores
"""
tokens_id = self.tokenizer.encode(text, add_special_tokens=False)
if ntoks_max is not None:
tokens_id = tokens_id[:ntoks_max]
total_len = len(tokens_id)
token_details = []
seen_grams = set()
# Add initial tokens that can't be scored (not enough context)
num_start = min(self.ngram, total_len)
for i in range(num_start):
token_details.append({
'token_id': tokens_id[i],
'is_scored': False,
'score': float('nan'),
'token_text': self.tokenizer.decode([tokens_id[i]])
})
# Score remaining tokens
for cur_pos in range(self.ngram, total_len):
ngram_tokens = tokens_id[cur_pos-self.ngram:cur_pos]
is_scored = True
if scoring_method == 'v1':
tup_for_unique = tuple(ngram_tokens)
is_scored = tup_for_unique not in seen_grams
if is_scored:
seen_grams.add(tup_for_unique)
elif scoring_method == 'v2':
tup_for_unique = tuple(ngram_tokens + [tokens_id[cur_pos]])
is_scored = tup_for_unique not in seen_grams
if is_scored:
seen_grams.add(tup_for_unique)
score = float('nan')
if is_scored:
score = self.score_tok(ngram_tokens, tokens_id[cur_pos])
score = float(score)
token_details.append({
'token_id': tokens_id[cur_pos],
'is_scored': is_scored,
'score': score,
'token_text': self.tokenizer.decode([tokens_id[cur_pos]])
})
return token_details
def get_pvalues_by_tok(
self,
token_details: list[dict]
) -> tuple[list[float], dict]:
"""
Get p-value for each token so far.
Args:
token_details: list of dicts containing token info and scores from get_details()
Returns:
tuple containing:
- list of p-values, with nan for unscored tokens
- dict with auxiliary information:
- final_score: final running score
- ntoks_scored: final number of scored tokens
- final_pvalue: last non-nan pvalue (0.5 if none available)
"""
pvalues = []
running_score = 0
ntoks_scored = 0
eps = 1e-10 # small constant to avoid numerical issues
last_valid_pvalue = 0.5 # default value if no tokens are scored
for token in token_details:
if token['is_scored']:
running_score += token['score']
ntoks_scored += 1
pvalue = self.get_pvalue(running_score, ntoks_scored, eps)
last_valid_pvalue = pvalue
pvalues.append(pvalue)
else:
pvalues.append(float('nan'))
aux_info = {
'final_score': running_score,
'ntoks_scored': ntoks_scored,
'final_pvalue': last_valid_pvalue
}
return pvalues, aux_info
def score_tok(self, ngram_tokens: list[int], token_id: int):
""" for each token in the text, compute the score increment """
raise NotImplementedError
def get_pvalue(self, score: float, ntoks: int, eps: float):
""" compute the p-value for a couple of score and number of tokens """
raise NotImplementedError
class MarylandDetector(WmDetector):
def __init__(self,
tokenizer: AutoTokenizer,
ngram: int = 1,
seed: int = 0,
gamma: float = 0.5,
delta: float = 1.0,
**kwargs):
super().__init__(tokenizer, ngram, seed, **kwargs)
self.gamma = gamma
self.delta = delta
def score_tok(self, ngram_tokens, token_id):
"""
score_t = 1 if token_id in greenlist else 0
"""
seed = get_seed_rng(self.seed, ngram_tokens)
self.rng.manual_seed(seed)
scores = torch.zeros(self.vocab_size)
vocab_permutation = torch.randperm(self.vocab_size, generator=self.rng)
greenlist = vocab_permutation[:int(self.gamma * self.vocab_size)] # gamma * n toks in the greenlist
scores[greenlist] = 1
return scores[token_id]
def get_pvalue(self, score: int, ntoks: int, eps: float):
""" from cdf of a binomial distribution """
pvalue = special.betainc(score, 1 + ntoks - score, self.gamma)
return max(pvalue, eps)
class MarylandDetectorZ(WmDetector):
def __init__(self,
tokenizer: AutoTokenizer,
ngram: int = 1,
seed: int = 0,
gamma: float = 0.5,
delta: float = 1.0,
**kwargs):
super().__init__(tokenizer, ngram, seed, **kwargs)
self.gamma = gamma
self.delta = delta
def score_tok(self, ngram_tokens, token_id):
""" same as MarylandDetector but using zscore """
seed = get_seed_rng(self.seed, ngram_tokens)
self.rng.manual_seed(seed)
scores = torch.zeros(self.vocab_size)
vocab_permutation = torch.randperm(self.vocab_size, generator=self.rng)
greenlist = vocab_permutation[:int(self.gamma * self.vocab_size)] # gamma * n
scores[greenlist] = 1
return scores[token_id]
def get_pvalue(self, score: int, ntoks: int, eps: float):
""" from cdf of a normal distribution """
zscore = (score - self.gamma * ntoks) / np.sqrt(self.gamma * (1 - self.gamma) * ntoks)
pvalue = 0.5 * special.erfc(zscore / np.sqrt(2))
return max(pvalue, eps)
class OpenaiDetector(WmDetector):
def __init__(self,
tokenizer: AutoTokenizer,
ngram: int = 1,
seed: int = 0,
**kwargs):
super().__init__(tokenizer, ngram, seed, **kwargs)
def score_tok(self, ngram_tokens, token_id):
"""
score_t = -log(1 - rt[token_id]])
"""
seed = get_seed_rng(self.seed, ngram_tokens)
self.rng.manual_seed(seed)
rs = torch.rand(self.vocab_size, generator=self.rng) # n
scores = -(1 - rs).log()
return scores[token_id]
def get_pvalue(self, score: float, ntoks: int, eps: float):
""" from cdf of a gamma distribution """
pvalue = special.gammaincc(ntoks, score)
return max(pvalue, eps)
class OpenaiDetectorZ(WmDetector):
def __init__(self,
tokenizer: AutoTokenizer,
ngram: int = 1,
seed: int = 0,
**kwargs):
super().__init__(tokenizer, ngram, seed, **kwargs)
def score_tok(self, ngram_tokens, token_id):
""" same as OpenaiDetector but using zscore """
seed = get_seed_rng(self.seed, ngram_tokens)
self.rng.manual_seed(seed)
rs = torch.rand(self.vocab_size, generator=self.rng) # n
scores = -(1 - rs).log()
return scores[token_id]
def get_pvalue(self, score: float, ntoks: int, eps: float):
""" from cdf of a normal distribution """
mu0 = 1
sigma0 = np.pi / np.sqrt(6)
zscore = (score/ntoks - mu0) / (sigma0 / np.sqrt(ntoks))
pvalue = 0.5 * special.erfc(zscore / np.sqrt(2))
return max(pvalue, eps)