File size: 7,034 Bytes
92c0372 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
import unicodedata
from typing import List, Tuple
import torch
from transformers import AutoModelForTokenClassification, AutoTokenizer
class NoiseDetector:
def __init__(self, model_path: str):
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model = AutoModelForTokenClassification.from_pretrained(model_path).to(
self.device
)
self.tokenizer = AutoTokenizer.from_pretrained(model_path)
self.model.eval()
def _normalize_text(self, text: str) -> str:
return unicodedata.normalize("NFKC", text)
def _convert_token_spans_to_char_spans(
self,
text: str,
noise_token_indices: List[int],
offset_mapping: List[Tuple[int, int]],
) -> List[Tuple[int, int]]:
char_spans = []
current_span = None
for idx, (is_noise, (start, end)) in enumerate(
zip(noise_token_indices, offset_mapping)
):
# Skip special tokens (CLS, SEP, etc.)
if start == end == 0:
continue
if is_noise:
if current_span is None:
current_span = [start, end]
else:
current_span[1] = end
elif current_span is not None:
char_spans.append(tuple(current_span))
current_span = None
# Don't forget to add the last span if it exists
if current_span is not None:
char_spans.append(tuple(current_span))
return char_spans
def detect(
self, texts: List[str], threshold: float = 0.5
) -> List[List[Tuple[int, int]]]:
"""
Detect noise spans in the given texts.
Args:
texts: List of input texts
threshold: Confidence threshold for noise detection (default: 0.5)
Returns:
List of lists containing (start, end) character positions of detected noise spans for each text
"""
results = []
with torch.no_grad():
for text in texts:
# Normalize text
normalized_text = self._normalize_text(text)
# Tokenize
tokens = self.tokenizer(
normalized_text,
truncation=True,
return_offsets_mapping=True,
return_tensors="pt",
)
# Move to device
input_ids = tokens["input_ids"].to(self.device)
attention_mask = tokens["attention_mask"].to(self.device)
# Get predictions
outputs = self.model(input_ids=input_ids, attention_mask=attention_mask)
logits = outputs.logits
# Convert logits to probabilities
probs = torch.softmax(logits, dim=-1)
# Get noise predictions (class 1)
noise_probs = probs[0, :, 1].cpu().numpy()
noise_predictions = (noise_probs > threshold).astype(int)
# Convert token-level predictions to character spans
char_spans = self._convert_token_spans_to_char_spans(
normalized_text,
noise_predictions,
tokens["offset_mapping"][0].tolist(),
)
results.append(char_spans)
return results
def detect_and_highlight(
self, texts: List[str], threshold: float = 0.5
) -> List[str]:
"""
Detect noise spans and return texts with noise sections highlighted.
Args:
texts: List of input texts
threshold: Confidence threshold for noise detection (default: 0.5)
Returns:
List of texts with noise sections wrapped in [NOISE]...[/NOISE] tags
"""
noise_spans = self.detect(texts, threshold)
highlighted_texts = []
for text, spans in zip(texts, noise_spans):
if not spans:
highlighted_texts.append(text)
continue
# Sort spans by start position
spans = sorted(spans)
# Build highlighted text
result = []
last_end = 0
for start, end in spans:
# Add text before noise
result.append(text[last_end:start])
# Add highlighted noise
# もし長さがN以下なら、ハイライトしない
if end - start > 3:
result.append(f"[NOISE]{text[start:end]}[/NOISE]")
else:
result.append(text[start:end])
# result.append(f"[NOISE]{text[start:end]}[/NOISE]")
last_end = end
# Add remaining text
result.append(text[last_end:])
highlighted_texts.append("".join(result))
return highlighted_texts
def main():
model_path = "hotchpotch/fineweb-2-japanese-text-cleaner"
detector = NoiseDetector(model_path)
NOISE_TEXT = """
この文章は90日以上更新の無いサイトに表示されています。
ログイン ログアウト
本当に必要な文章以外にも、さまざまなノイズが含まれていることがあります。例えば、この文章もその一例です。本来不要なテキストが入ってしまうことがこのようにあるでしょう。
今なら50%オフ!クリックしてリンク先の商品を表示
とりわけ文章長が短い場合、文章のほとんどがノイズを含む可能性があります。それらを取り除くことで、より高品質の文章を抽出できないかと考えています。
前のページ 次のページ
""".strip()
texts = [
NOISE_TEXT,
"これは正常なテキストです。しかし、ここに🤣絵文字があります。そして普通の文章が続きます。",
"普通の文章です。ASCII ART(^_^)があります。最後も普通です。",
"ログイン 文章の密ベクトルは、情報検索・文章判別・類似文章抽出など、さまざまな用途に使うことができます。しかしながら最先端のTransformerモデルは小さいモデルでも、とりわけCPU環境では処理速度が遅いため実用でないこともしばしばあります。この課題を解決する新しいアプローチとして、先日公開されたTransformerモデル「ではない」 StaticEmbeddingモデルは、例えば intfloat/multilingual-e5-small (以下mE5-small)とのベンチマーク比較では85%のスコアという最低十分な性能で、何よりCPUで動作時に126倍高速に文ベクトルを作成することができる、という驚きの速度です。 記事の一覧 >",
]
highlighted_texts = detector.detect_and_highlight(texts, threshold=0.7)
for text in highlighted_texts:
print(f"\n{text}")
if __name__ == "__main__":
main()
|