authorchecks-backend / model_handler.py
Jaja-09's picture
.
44272cc
"""
Model Handler for Two-Branch AI Detection Model
Combines DeBERTa embeddings with sentiment features
Uses XGBoost for final classification
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoConfig, AutoModel, PreTrainedModel, AutoModelForSequenceClassification
import os
import logging
import time
from typing import Dict, Any, Optional, List, Tuple
import numpy as np
from pathlib import Path
import xgboost as xgb
import json
import nltk
from nltk.tokenize import sent_tokenize
# Download NLTK data
try:
nltk.data.find('tokenizers/punkt')
except LookupError:
nltk.download('punkt', quiet=True)
try:
nltk.data.find('tokenizers/punkt_tab')
except LookupError:
nltk.download('punkt_tab', quiet=True)
logger = logging.getLogger(__name__)
class DesklibAIDetectionModel(PreTrainedModel):
"""
DeBERTa-based AI detection model
Architecture from desklib/ai-text-detector-v1.01
"""
config_class = AutoConfig
def __init__(self, config):
super().__init__(config)
# Initialize the base transformer model
self.model = AutoModel.from_config(config)
# Define a classifier head
self.classifier = nn.Linear(config.hidden_size, 1)
# Initialize weights
self.init_weights()
def forward(self, input_ids, attention_mask=None, labels=None):
# Forward pass through the transformer
outputs = self.model(input_ids, attention_mask=attention_mask)
last_hidden_state = outputs[0]
# Mean pooling
input_mask_expanded = attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float()
sum_embeddings = torch.sum(last_hidden_state * input_mask_expanded, dim=1)
sum_mask = torch.clamp(input_mask_expanded.sum(dim=1), min=1e-9)
pooled_output = sum_embeddings / sum_mask
# Classifier
logits = self.classifier(pooled_output)
loss = None
if labels is not None:
loss_fct = nn.BCEWithLogitsLoss()
loss = loss_fct(logits.view(-1), labels.float())
output = {"logits": logits}
if loss is not None:
output["loss"] = loss
return output
class AIDetectionModelHandler:
"""
Handles Two-Branch AI detection:
- DeBERTa for semantic embeddings
- Sentiment features (avg_polarity, polarity_variance)
- XGBoost for final classification
"""
def __init__(self, model_path: Optional[str] = None, max_length: int = 512):
"""
Initialize the model handler
Args:
model_path: Path to the model directory (default: ../model/model)
max_length: Maximum token length for input text
"""
self.max_length = max_length
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.deberta_model = None
self.tokenizer = None
self.sentiment_model = None
self.sentiment_tokenizer = None
self.xgboost_model = None
self.model_loaded = False
# Default model paths
if model_path is None:
# Prefer explicit env var
env_model_path = os.getenv("MODEL_PATH")
if env_model_path and os.path.exists(env_model_path):
model_path = env_model_path
elif os.path.exists("/app/model"):
model_path = "/app/model"
else:
# Fallback to legacy relative path
backend_dir = Path(__file__).parent
model_path = str(backend_dir.parent / "model" / "model")
self.model_path = model_path
# XGBoost file is expected inside the same folder as the other model artifacts
self.xgboost_path = str(Path(model_path) / "xgboost_model.json")
# Load the models
self._load_models()
def _load_models(self):
"""Load DeBERTa, sentiment model, and XGBoost classifier"""
try:
logger.info(f"CUDA available: {torch.cuda.is_available()}")
logger.info(f"Selected device: {self.device}")
logger.info(f"Loading models from: {self.model_path}")
logger.info(f"Using device: {self.device}")
# Check if model path exists
if not os.path.exists(self.model_path):
logger.error(f"Model path does not exist: {self.model_path}")
raise FileNotFoundError(f"Model not found at {self.model_path}")
# 1. Load DeBERTa tokenizer and model
logger.info("Loading DeBERTa tokenizer...")
self.tokenizer = AutoTokenizer.from_pretrained(self.model_path)
logger.info("Loading DeBERTa model...")
self.deberta_model = DesklibAIDetectionModel.from_pretrained(self.model_path)
self.deberta_model.to(self.device)
self.deberta_model.eval()
print("DeBERTa model device:", next(self.deberta_model.parameters()).device)
# 2. Load sentiment analysis model (DistilBERT)
logger.info("Loading sentiment model...")
sentiment_model_name = "distilbert-base-uncased-finetuned-sst-2-english"
self.sentiment_tokenizer = AutoTokenizer.from_pretrained(sentiment_model_name)
self.sentiment_model = AutoModelForSequenceClassification.from_pretrained(sentiment_model_name)
self.sentiment_model.to(self.device)
self.sentiment_model.eval()
print("Sentiment model device:", next(self.sentiment_model.parameters()).device)
# 3. Load XGBoost model
if os.path.exists(self.xgboost_path):
logger.info(f"Loading XGBoost model from: {self.xgboost_path}")
t0 = time.perf_counter()
self.xgboost_model = xgb.Booster()
self.xgboost_model.load_model(self.xgboost_path)
# Force GPU or CPU depending on hardware
if torch.cuda.is_available():
logger.info("Setting XGBoost to use GPU predictor")
try:
self.xgboost_model.set_param({"predictor": "gpu_predictor", "tree_method": "gpu_hist"})
logger.info("XGBoost configured to use GPU (gpu_predictor, gpu_hist)")
except Exception as ie:
logger.warning(f"Failed to set XGBoost GPU params: {ie}")
else:
logger.info("Setting XGBoost to use CPU predictor")
try:
self.xgboost_model.set_param({"predictor": "cpu_predictor", "tree_method": "hist"})
except Exception as ie:
logger.warning(f"Failed to set XGBoost CPU params: {ie}")
t1 = time.perf_counter()
logger.info(f"XGBoost model loaded in {t1 - t0:.4f}s")
logger.info("✅ XGBoost model loaded!")
else:
logger.warning(f"XGBoost model not found at {self.xgboost_path}, using DeBERTa only")
self.xgboost_model = None
# 🔍 OPTIONAL: PRINT GPU NAME
if torch.cuda.is_available():
print("GPU detected:", torch.cuda.get_device_name(0))
self.model_loaded = True
logger.info("✅ All models loaded successfully!")
except Exception as e:
logger.error(f"Failed to load models: {e}", exc_info=True)
self.model_loaded = False
raise
def is_loaded(self) -> bool:
"""Check if model is loaded"""
return self.model_loaded
def get_sentiment_scores(self, text: str) -> List[float]:
"""
Extract sentiment scores for each sentence using DistilBERT
Args:
text: Input text
Returns:
List of sentiment scores (polarity) for each sentence
"""
try:
# Tokenize into sentences
sentences = sent_tokenize(text)
if not sentences:
return [0.5] # Neutral if no sentences
scores = []
start_total = time.perf_counter()
with torch.no_grad():
for i, sentence in enumerate(sentences):
s0 = time.perf_counter()
# Tokenize sentence
inputs = self.sentiment_tokenizer(
sentence,
return_tensors="pt",
padding=True,
truncation=True,
max_length=512
)
inputs = {k: v.to(self.device) for k, v in inputs.items()}
# Get sentiment prediction
outputs = self.sentiment_model(**inputs)
logits = outputs.logits
probabilities = F.softmax(logits, dim=-1)
# Get positive sentiment probability (index 1)
pos_prob = probabilities[0][1].item()
# Convert to polarity score (-1 to 1, where 0.5 is neutral)
polarity = (pos_prob - 0.5) * 2 # Maps [0,1] to [-1,1]
scores.append(polarity)
s1 = time.perf_counter()
logger.debug(f"Sentiment sentence processed in {s1 - s0:.4f}s")
total_time = time.perf_counter() - start_total
logger.info(f"Extracted sentiment scores for {len(sentences)} sentences in {total_time:.4f}s")
return scores
except Exception as e:
logger.error(f"Error extracting sentiment scores: {e}")
return [0.0] # Return neutral on error
def extract_sentiment_features(self, text: str) -> np.ndarray:
"""
Extract avg_polarity and polarity_variance from text
Args:
text: Input text
Returns:
Numpy array with [avg_polarity, polarity_variance]
"""
start = time.perf_counter()
sentiment_scores = self.get_sentiment_scores(text)
# Calculate features
avg_polarity = float(np.mean(sentiment_scores)) if sentiment_scores else 0.0
polarity_variance = float(np.var(sentiment_scores)) if len(sentiment_scores) > 1 else 0.0
duration = time.perf_counter() - start
logger.info(f"Sentiment features extracted in {duration:.4f}s (avg_polarity={avg_polarity:.4f}, variance={polarity_variance:.4f})")
return np.array([avg_polarity, polarity_variance], dtype=np.float32)
def get_deberta_embeddings(self, text: str) -> np.ndarray:
"""
Get DeBERTa embeddings for text using mean pooling
Args:
text: Input text
Returns:
Numpy array of embeddings
"""
try:
t_total = time.perf_counter()
# Tokenize input
t0 = time.perf_counter()
encoded = self.tokenizer(
text,
padding='max_length',
truncation=True,
max_length=self.max_length,
return_tensors='pt'
)
t1 = time.perf_counter()
logger.debug(f"Tokenization time: {t1 - t0:.4f}s")
input_ids = encoded['input_ids'].to(self.device)
attention_mask = encoded['attention_mask'].to(self.device)
# Get embeddings
with torch.no_grad():
t0 = time.perf_counter()
outputs = self.deberta_model.model(input_ids=input_ids, attention_mask=attention_mask)
t1 = time.perf_counter()
logger.debug(f"Transformer forward pass time: {t1 - t0:.4f}s")
last_hidden_state = outputs[0]
# Mean pooling
t0 = time.perf_counter()
input_mask_expanded = attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float()
sum_embeddings = torch.sum(last_hidden_state * input_mask_expanded, dim=1)
sum_mask = torch.clamp(input_mask_expanded.sum(dim=1), min=1e-9)
pooled_output = sum_embeddings / sum_mask
t1 = time.perf_counter()
logger.debug(f"Pooling time: {t1 - t0:.4f}s")
# Convert to numpy
embeddings = pooled_output.cpu().numpy().flatten()
total = time.perf_counter() - t_total
return embeddings
except Exception as e:
logger.error(f"Error extracting DeBERTa embeddings: {e}", exc_info=True)
raise
def predict_probability(self, text: str, threshold: float = 0.5) -> Dict[str, Any]:
"""
Predict if text is AI-generated using two-branch architecture
Args:
text: Input text to analyze
threshold: Classification threshold (default: 0.5)
Returns:
Dictionary with probability, label, sentiment features
"""
if not self.model_loaded:
raise RuntimeError("Model not loaded. Cannot perform prediction.")
try:
overall_start = time.perf_counter()
# Extract sentiment features
logger.info("Extracting sentiment features...")
sentiment_start = time.perf_counter()
sentiment_features = self.extract_sentiment_features(text)
sentiment_time = time.perf_counter() - sentiment_start
avg_polarity = float(sentiment_features[0])
polarity_variance = float(sentiment_features[1])
logger.info(f"Sentiment extraction took {sentiment_time:.4f}s")
# If XGBoost is available, use the full two-branch pipeline
if self.xgboost_model is not None:
logger.info("Using XGBoost two-branch model...")
embed_start = time.perf_counter()
# Get DeBERTa embeddings
deberta_embeddings = self.get_deberta_embeddings(text)
embed_time = time.perf_counter() - embed_start
logger.info(f"DeBERTa embedding extraction took {embed_time:.4f}s")
# Combine features: DeBERTa embeddings + sentiment features
combined_features = np.concatenate([deberta_embeddings, sentiment_features])
# Create DMatrix for XGBoost
dmatrix = xgb.DMatrix(combined_features.reshape(1, -1))
# Predict
xgb_start = time.perf_counter()
probability = float(self.xgboost_model.predict(dmatrix)[0])
xgb_time = time.perf_counter() - xgb_start
logger.info(f"XGBoost prediction took {xgb_time:.4f}s")
else:
# Fallback to DeBERTa only
logger.info("Using DeBERTa model only (XGBoost not found)...")
encoded = self.tokenizer(
text,
padding='max_length',
truncation=True,
max_length=self.max_length,
return_tensors='pt'
)
input_ids = encoded['input_ids'].to(self.device)
attention_mask = encoded['attention_mask'].to(self.device)
with torch.no_grad():
t0 = time.perf_counter()
outputs = self.deberta_model(input_ids=input_ids, attention_mask=attention_mask)
t1 = time.perf_counter()
logger.info(f"DeBERTa forward & classification took {t1 - t0:.4f}s")
logits = outputs["logits"]
probability = torch.sigmoid(logits).item()
label = 1 if probability >= threshold else 0
overall_time = time.perf_counter() - overall_start
logger.info(f"Total prediction pipeline took {overall_time:.4f}s (prob={probability:.4f})")
return {
"probability": probability,
"label": label,
"classification": "ai" if label == 1 else "human",
"confidence": probability if label == 1 else (1 - probability),
"sentiment_features": {
"avg_polarity": avg_polarity,
"polarity_variance": polarity_variance
}
}
except Exception as e:
logger.error(f"Prediction error: {e}", exc_info=True)
raise
def predict_single_text_xgboost(self, text: str) -> Tuple[float, int]:
"""
Predict AI probability and label for a single text using XGBoost model
Args:
text: Input text to analyze
Returns:
Tuple of (probability, label) where label is 0 for human, 1 for AI
"""
try:
start_total = time.perf_counter()
# Extract sentiment features
sentiment_features = self.extract_sentiment_features(text)
avg_polarity = float(sentiment_features[0])
polarity_variance = float(sentiment_features[1])
# If XGBoost is available, use the full two-branch pipeline
if self.xgboost_model is not None:
embed_start = time.perf_counter()
# Get DeBERTa embeddings
deberta_embeddings = self.get_deberta_embeddings(text)
embed_time = time.perf_counter() - embed_start
logger.info(f"DeBERTa embedding extraction took {embed_time:.4f}s")
# Combine features: DeBERTa embeddings + sentiment features
combined_features = np.concatenate([deberta_embeddings, sentiment_features])
# Create DMatrix for XGBoost
dmatrix = xgb.DMatrix(combined_features.reshape(1, -1))
xgb_start = time.perf_counter()
# Predict
probability = float(self.xgboost_model.predict(dmatrix)[0])
xgb_time = time.perf_counter() - xgb_start
logger.info(f"XGBoost prediction (single) took {xgb_time:.4f}s")
else:
# Fallback to DeBERTa only
encoded = self.tokenizer(
text,
padding='max_length',
truncation=True,
max_length=self.max_length,
return_tensors='pt'
)
input_ids = encoded['input_ids'].to(self.device)
attention_mask = encoded['attention_mask'].to(self.device)
with torch.no_grad():
t0 = time.perf_counter()
outputs = self.deberta_model(input_ids=input_ids, attention_mask=attention_mask)
t1 = time.perf_counter()
logger.info(f"DeBERTa forward (single) took {t1 - t0:.4f}s")
logits = outputs["logits"]
probability = torch.sigmoid(logits).item()
label = 1 if probability >= 0.5 else 0
total = time.perf_counter() - start_total
logger.info(f"predict_single_text_xgboost total time: {total:.4f}s")
return probability, label
except Exception as e:
logger.error(f"Single text prediction error: {e}", exc_info=True)
raise
def detect_mixed_text_chunk_based(self, text: str, chunk_size: int = 4, overlap: int = 1, min_chunk_length: int = 50) -> Dict[str, Any]:
"""
Improved mixed text detection using chunk-based analysis that influences overall probability
Args:
text: Input text string
chunk_size: Number of sentences per chunk (default: 4)
overlap: Number of sentences to overlap between chunks (default: 1)
min_chunk_length: Minimum character length for a chunk to be analyzed
Returns:
Dictionary with prediction results and analysis details
Note:
Input validation: Text must be 80-2000 words. Dynamic chunking: 4-5 sentences
analyzed as whole, then chunk size varies:
- 6-10 sentences: 3 sentences per chunk
- 11-20 sentences: 4 sentences per chunk
- 21-30 sentences: 5 sentences per chunk
- 31+ sentences: 6 sentences per chunk
Uses overlapping chunks to capture transitions between AI and human content.
"""
# Get overall prediction (your current method)
overall_prob, overall_label = self.predict_single_text_xgboost(text)
# Split text into sentences
sentences = sent_tokenize(text)
# Validate input text length (80-2000 words)
total_words = len(text.split())
if total_words < 80:
return {
'prediction': 'Human' if overall_label == 0 else 'AI',
'confidence': abs(overall_prob - 0.5) * 2,
'is_mixed': False,
'reason': f'Text too short for analysis ({total_words} words, minimum 80 words required)',
'overall_probability': overall_prob,
'modified_probability': overall_prob,
'chunk_analysis': []
}
elif total_words > 2000:
return {
'prediction': 'Human' if overall_label == 0 else 'AI',
'confidence': abs(overall_prob - 0.5) * 2,
'is_mixed': False,
'reason': f'Text too long for analysis ({total_words} words, maximum 2000 words allowed)',
'overall_probability': overall_prob,
'modified_probability': overall_prob,
'chunk_analysis': []
}
# Compute sentence character offsets (start/end) to map back to original text
sentence_offsets: List[Tuple[int, int]] = []
search_start = 0
for sent in sentences:
# find the sentence occurrence starting from search_start
idx = text.find(sent, search_start)
if idx == -1:
# fallback: skip whitespace and set to previous end
idx = search_start
start_char = idx
end_char = start_char + len(sent)
sentence_offsets.append((start_char, end_char))
search_start = end_char
# Dynamic chunking based on total sentence count
total_sentences = len(sentences)
# For 4-5 sentences, analyze as whole (no chunking)
if total_sentences <= 5:
return {
'prediction': 'Human' if overall_label == 0 else 'AI',
'confidence': abs(overall_prob - 0.5) * 2,
'is_mixed': False,
'reason': f'Analyzing {total_sentences} sentences as whole (4-5 sentence range)',
'overall_probability': overall_prob,
'modified_probability': overall_prob,
'chunk_analysis': []
}
# Dynamic chunk size based on total sentences
if total_sentences <= 10:
dynamic_chunk_size = 3
elif total_sentences <= 20:
dynamic_chunk_size = 4
elif total_sentences <= 30:
dynamic_chunk_size = 5
else:
dynamic_chunk_size = 6 # For very long texts
# Ensure we have enough sentences for at least 2 chunks
if total_sentences < dynamic_chunk_size * 2:
return {
'prediction': 'Human' if overall_label == 0 else 'AI',
'confidence': abs(overall_prob - 0.5) * 2,
'is_mixed': False,
'reason': f'Text too short for chunk analysis ({total_sentences} sentences, need at least {dynamic_chunk_size * 2})',
'overall_probability': overall_prob,
'modified_probability': overall_prob,
'chunk_analysis': []
}
# Create overlapping chunks and retain sentence index ranges
chunks = [] # textual chunks (for backward compat)
chunk_sentence_ranges: List[Tuple[int, int]] = [] # inclusive start, inclusive end sentence idx
chunk_predictions: List[Tuple[float, int]] = []
chunk_probabilities: List[float] = []
logger.info(f"Analyzing text with {total_sentences} sentences using dynamic chunk size of {dynamic_chunk_size}...")
for i in range(0, len(sentences) - dynamic_chunk_size + 1, dynamic_chunk_size - overlap):
# Create chunk from sentences
start_idx = i
end_idx = i + dynamic_chunk_size - 1
chunk_sentences = sentences[start_idx:end_idx + 1]
chunk_text = ' '.join(chunk_sentences)
# Only analyze chunks that meet minimum length requirement
if len(chunk_text.strip()) >= min_chunk_length:
chunks.append(chunk_text)
chunk_sentence_ranges.append((start_idx, end_idx))
# Analyze this chunk
prob, label = self.predict_single_text_xgboost(chunk_text)
chunk_predictions.append((prob, label))
chunk_probabilities.append(prob)
logger.info(f" Chunk {len(chunks)}: {chunk_text[:60]}... → {'AI' if label == 1 else 'Human'} ({prob:.3f})")
if len(chunk_predictions) < 2:
return {
'prediction': 'Human' if overall_label == 0 else 'AI',
'confidence': abs(overall_prob - 0.5) * 2,
'is_mixed': False,
'reason': 'Too few chunks for mixed analysis',
'overall_probability': overall_prob,
'modified_probability': overall_prob,
'chunk_probabilities': chunk_probabilities,
'raw_chunks': [],
'sentence_analysis': [],
'merged_spans': [],
'chunk_analysis': chunk_predictions
}
# Count human vs AI chunks
human_chunks = sum(1 for _, label in chunk_predictions if label == 0)
ai_chunks = sum(1 for _, label in chunk_predictions if label == 1)
total_chunks = len(chunk_predictions)
# Mixed text detection logic
is_mixed = human_chunks > 0 and ai_chunks > 0
mixed_ratio = min(human_chunks, ai_chunks) / total_chunks
chunk_avg_prob = float(np.mean(chunk_probabilities)) if chunk_probabilities else overall_prob
chunk_label = 'AI' if chunk_avg_prob >= 0.5 else 'Human'
logger.info(f"\nChunk Analysis Summary:")
logger.info(f" Total chunks analyzed: {total_chunks}")
logger.info(f" Human chunks: {human_chunks}")
logger.info(f" AI chunks: {ai_chunks}")
logger.info(f" Mixed ratio: {mixed_ratio:.2f}")
logger.info(f" Average chunk probability: {chunk_avg_prob:.3f}")
logger.info(f" Chunk-derived label: {chunk_label}")
if is_mixed:
final_prediction = 'Mixed'
modified_prob = chunk_avg_prob
confidence = 1.0 - mixed_ratio
logger.info(" → MIXED TEXT DETECTED (chunk-based)")
else:
final_prediction = chunk_label
modified_prob = chunk_avg_prob
confidence = abs(chunk_avg_prob - 0.5) * 2
logger.info(f" → Pure {chunk_label} text based on chunk probabilities")
# Build detailed raw_chunks with character offsets
raw_chunks: List[Dict[str, Any]] = []
for idx, ((prob, label), (sent_start, sent_end)) in enumerate(zip(chunk_predictions, chunk_sentence_ranges)):
# Map sentence indices to char offsets
start_char = sentence_offsets[sent_start][0] if sent_start < len(sentence_offsets) else 0
end_char = sentence_offsets[sent_end][1] if sent_end < len(sentence_offsets) else len(text)
chunk_text = text[start_char:end_char]
raw_chunks.append({
'chunk_index': idx,
'start_char': start_char,
'end_char': end_char,
'text': chunk_text,
'probability': float(prob),
'label': 'ai' if label == 1 else 'human',
'sentence_range': [sent_start, sent_end]
})
# Compute per-sentence aggregated probabilities and labels (weighted by chunk presence)
sentence_analysis: List[Dict[str, Any]] = []
for si in range(len(sentences)):
# Find chunks covering this sentence
covering_probs: List[float] = []
covering_labels: List[int] = []
for (prob, label), (cs, ce) in zip(chunk_predictions, chunk_sentence_ranges):
if cs <= si <= ce:
covering_probs.append(prob)
covering_labels.append(label)
if covering_probs:
avg_prob = float(np.mean(covering_probs))
# Use weighted/average probability as primary signal, but also
# consider chunk label majority with a safety threshold.
# Tighten AI labeling by requiring a higher probability threshold
# to reduce false positives from noisy chunks.
label_frac = float(np.mean(covering_labels)) if covering_labels else 0.0
AI_PROB_THRESHOLD = 0.55
# If average probability is confidently AI, mark as AI.
if avg_prob >= AI_PROB_THRESHOLD:
sentence_label = 'ai'
# Otherwise, if majority of covering chunks are labeled AI and
# probability is at least 0.5, mark as AI (minority case).
elif label_frac > 0.5 and avg_prob >= 0.5:
sentence_label = 'ai'
else:
sentence_label = 'human'
else:
# No covering chunks: use nearest-chunk fallback (prefer previous chunk,
# otherwise next chunk). This avoids falling back to the global overall_prob
# which can make trailing sentences inherit the global label.
nearest_prob = None
nearest_label = None
# find previous chunk index (the last chunk that ends before this sentence)
prev_idx = None
for idx, (cs, ce) in enumerate(chunk_sentence_ranges):
if ce < si:
prev_idx = idx
if prev_idx is not None:
nearest_prob, nearest_label = chunk_predictions[prev_idx]
else:
# find next chunk index (the first chunk that starts after this sentence)
next_idx = None
for idx, (cs, ce) in enumerate(chunk_sentence_ranges):
if cs > si:
next_idx = idx
break
if next_idx is not None:
nearest_prob, nearest_label = chunk_predictions[next_idx]
if nearest_prob is not None:
avg_prob = float(nearest_prob)
sentence_label = 'ai' if nearest_label == 1 else 'human'
else:
# Fallback to overall prediction if there are truly no chunks
avg_prob = overall_prob
sentence_label = 'ai' if overall_label == 1 else 'human'
start_c, end_c = sentence_offsets[si] if si < len(sentence_offsets) else (0, 0)
sentence_analysis.append({
'sentence_index': si,
'start_char': start_c,
'end_char': end_c,
'text': sentences[si],
'avg_probability': avg_prob,
'label': sentence_label
})
# Merge adjacent sentences with same label into non-overlapping spans for easy frontend rendering
merged_spans: List[Dict[str, Any]] = []
if sentence_analysis:
cur = sentence_analysis[0]
cur_start = cur['start_char']
cur_end = cur['end_char']
cur_label = cur['label']
cur_probs = [cur['avg_probability']]
for s in sentence_analysis[1:]:
if s['label'] == cur_label:
# extend current span
cur_end = s['end_char']
cur_probs.append(s['avg_probability'])
else:
merged_spans.append({
'start_char': cur_start,
'end_char': cur_end,
'label': cur_label,
'avg_probability': float(np.mean(cur_probs))
})
# start a new span
cur_start = s['start_char']
cur_end = s['end_char']
cur_label = s['label']
cur_probs = [s['avg_probability']]
# append final span
merged_spans.append({
'start_char': cur_start,
'end_char': cur_end,
'label': cur_label,
'avg_probability': float(np.mean(cur_probs))
})
return {
'prediction': final_prediction,
'confidence': confidence,
'is_mixed': is_mixed,
'mixed_ratio': mixed_ratio,
'human_chunks': human_chunks,
'ai_chunks': ai_chunks,
'total_chunks': total_chunks,
'overall_probability': overall_prob,
'modified_probability': modified_prob,
'chunk_probabilities': chunk_probabilities,
'chunk_analysis': chunk_predictions,
'raw_chunks': raw_chunks,
'sentence_analysis': sentence_analysis,
'merged_spans': merged_spans,
'chunk_size': chunk_size,
'overlap': overlap
}
def detect_ai(self, text: str) -> Dict[str, Any]:
"""
AI detection with chunk-based mixed text analysis
Args:
text: Input text
Returns:
Detection results with sentiment features and mixed text analysis
"""
# Use chunk-based detection for better mixed text handling
chunk_result = self.detect_mixed_text_chunk_based(text)
# Get sentiment features for explanation
sentiment_features = self.extract_sentiment_features(text)
avg_pol = float(sentiment_features[0])
pol_var = float(sentiment_features[1])
# Generate explanation based on prediction type
confidence_pct = chunk_result["confidence"] * 100
prediction = chunk_result["prediction"]
if confidence_pct > 90:
certainty = "very high confidence"
elif confidence_pct > 75:
certainty = "high confidence"
elif confidence_pct > 60:
certainty = "moderate confidence"
else:
certainty = "low confidence"
# Generate explanation based on prediction type
if prediction == "Mixed":
explanation = f"This text appears to be a mixture of AI-generated and human-authored text."
explanation += " This mixed composition suggests the text may have been collaboratively written or heavily edited."
# Add sentiment insights for mixed text
if pol_var > 0.60:
explanation += " High emotional variation across sections indicates significant style differences between parts."
elif pol_var >= 0.36:
explanation += " Moderate emotional variation suggests different writing styles in various sections."
else:
explanation += " Low emotional variation may indicate consistent editing or similar writing styles throughout."
elif prediction == "AI":
explanation = f"This text is classified as AI-Generated with {certainty}."
explanation += " The text is typically associated with AI-generated writing based on patterns, including uniform structure or predictable phrasing."
if pol_var <= 0.10:
explanation += " Very low emotional variation which is common in more structured or machine-generated texts."
elif pol_var <= 0.35:
explanation += " Low emotional variation which may align with AI patterns but can also occur in formal human writing."
elif pol_var <= 0.60:
explanation += " Moderate emotional variation which is less typical for AI but still possible depending on the prompt or model."
else:
explanation += " High emotional variation which is uncommon in AI outputs but may occur in certain complex or narrative prompts."
else: # Human
explanation = f"This text is classified as Human-Authored with {certainty}."
explanation += " The text shows patterns frequently observed in human writing, such as natural variations and flexible sentence structures."
if pol_var > 0.60:
explanation += " High emotional variation which often reflects expressive or opinionated writing."
elif pol_var >= 0.36:
explanation += " Moderate emotional variation which shows natural shifts in tone."
elif pol_var >= 0.11:
explanation += " Low emotional variation which may indicate formal or academic writing."
else:
explanation += " Very low emotional variation indicates consistent tone with focused perspective."
# Convert prediction to classification format for backward compatibility
classification_map = {"AI": "ai", "Human": "human", "Mixed": "mixed"}
classification = classification_map.get(prediction, "unknown")
return {
"classification": classification,
"prediction": prediction,
"probability": chunk_result["modified_probability"],
"confidence": confidence_pct,
"explanation": explanation,
"sentiment_features": {
"avg_polarity": avg_pol,
"polarity_variance": pol_var
},
"mixed_analysis": {
"is_mixed": chunk_result["is_mixed"],
"mixed_ratio": chunk_result.get("mixed_ratio", 0),
"human_chunks": chunk_result.get("human_chunks", 0),
"ai_chunks": chunk_result.get("ai_chunks", 0),
"total_chunks": chunk_result.get("total_chunks", 0),
"overall_probability": chunk_result["overall_probability"],
"modified_probability": chunk_result["modified_probability"]
},
"raw_chunks": chunk_result.get("raw_chunks", []),
"sentence_analysis": chunk_result.get("sentence_analysis", []),
"merged_spans": chunk_result.get("merged_spans", []),
"modelProcessingTime": time.perf_counter()
}
def analyze_text(self, text: str) -> Dict[str, Any]:
start_time = time.perf_counter()
"""
Comprehensive text analysis combining AI detection with sentiment features
Args:
text: Input text to analyze
Returns:
Complete analysis results with model-based sentiment features
"""
# Validate input text length (80-2000 words)
total_words = len(text.split())
if total_words < 80:
raise ValueError(f"Text too short for analysis ({total_words} words, minimum 80 words required)")
elif total_words > 2000:
raise ValueError(f"Text too long for analysis ({total_words} words, maximum 2000 words allowed)")
# Get AI detection results (includes sentiment features from model)
ai_detection = self.detect_ai(text)
mixed_analysis = ai_detection.get("mixed_analysis") or {}
modified_prob = mixed_analysis.get("modified_probability")
overall_prob = mixed_analysis.get("overall_probability")
primary_probability = None
for candidate in (modified_prob, overall_prob, ai_detection.get("probability")):
if isinstance(candidate, (int, float)):
primary_probability = float(candidate)
break
if primary_probability is None:
primary_probability = 0.0
ai_prob = max(0.0, min(1.0, primary_probability))
human_prob = 1.0 - ai_prob
probability_breakdown = {
"ai": ai_prob,
"human": human_prob
}
model_sentiment = ai_detection.get("sentiment_features", {})
# Perform basic text analysis
words = text.split()
sentences = [s.strip() for s in text.replace('!', '.').replace('?', '.').split('.') if s.strip()]
# Calculate basic metrics
word_count = len(words)
sentence_count = len(sentences)
avg_word_length = np.mean([len(w) for w in words]) if words else 0
avg_sentence_length = word_count / sentence_count if sentence_count > 0 else 0
# Determine complexity based on AI probability and text metrics
is_ai = ai_detection["classification"] == "ai"
is_mixed = ai_detection["classification"] == "mixed"
# Handle different prediction types
if is_mixed:
formality = "mixed"
complexity = "variable"
tone = "Mixed (AI/Human)"
audience = "Variable"
elif is_ai:
formality = "formal" if ai_prob > 0.7 else "neutral"
complexity = "complex" if avg_word_length > 6 else "moderate"
tone = "Professional"
audience = "General to Academic"
else:
formality = "casual" if avg_word_length < 5 else "neutral"
complexity = "simple" if avg_sentence_length < 15 else "moderate"
tone = "Conversational"
audience = "General Public"
# Generate insights based on detection results
insights = []
if is_mixed and ai_detection["confidence"] > 60:
insights.append({
"type": "observation",
"title": "Mixed Content Detected",
"description": f"This text contains both AI-generated and human-authored sections ({ai_detection['confidence']:.1f}% confidence).",
"suggestion": "Consider reviewing the text for consistency and ensuring all sections align with your intended voice and style."
})
insights.append({
"type": "observation",
"title": "Content Composition",
"description": f"Analysis found {mixed_analysis.get('human_chunks', 0)} human-like sections and {mixed_analysis.get('ai_chunks', 0)} AI-like sections.",
"suggestion": "The mixed nature suggests collaborative writing or heavy editing. Consider standardizing the writing style throughout."
})
elif is_ai and ai_detection["confidence"] > 75:
insights.append({
"type": "observation",
"title": "AI-Generated Content Detected",
"description": f"This text shows strong indicators associated with AI-generated writing ({ai_detection['confidence']:.1f}% confidence).",
"suggestion": "Consider adding personal insights, varied sentence structures, or unique perspectives to achieve a more unique voice."
})
elif not is_ai and ai_detection["confidence"] > 75:
insights.append({
"type": "strength",
"title": "Human Writing Characteristics",
"description": f"The text shows several features commonly found in human-authored writing ({ai_detection['confidence']:.1f}% confidence)."
})
# Sentence variety analysis
if sentence_count > 2:
sentence_lengths = [len(s.split()) for s in sentences]
std_dev = np.std(sentence_lengths)
if std_dev < 3:
insights.append({
"type": "improvement",
"title": "Sentence Variety",
"description": "Sentences have similar lengths, which this pattern may indicate AI generation.",
"suggestion": "Consider varying sentence length to create a more natural flow."
})
else:
insights.append({
"type": "strength",
"title": "Good Sentence Variety",
"description": "Text shows natural variation in sentence structure."
})
# Generate emotions based on model sentiment polarity (data-driven ranges)
avg_polarity = model_sentiment.get("avg_polarity", 0)
emotions = []
if avg_polarity >= 0.71:
emotions.append({"emotion": "very_positive", "score": min(abs(avg_polarity), 1.0), "intensity": "high"})
elif avg_polarity >= 0.30:
emotions.append({"emotion": "positive", "score": min(abs(avg_polarity), 1.0), "intensity": "medium"})
elif avg_polarity >= -0.29:
emotions.append({"emotion": "neutral", "score": 0.8, "intensity": "medium"})
else:
emotions.append({"emotion": "negative", "score": min(abs(avg_polarity), 1.0), "intensity": "high"})
# Construct full analysis response with model sentiment features
polarity_variance = model_sentiment.get("polarity_variance", 0)
end_time = time.perf_counter()
processing_seconds = round(end_time - start_time, 3) # exact seconds (millisecond precision)
logger.info(f"Model processing time for analyze_text: {processing_seconds:.3f}s")
return {
"advancedSentiment": {
"emotions": emotions,
"confidence": 70 + (ai_detection["confidence"] * 0.3),
"context": f"The text appears to be {'AI-Generated' if ai_detection['classification'] == 'ai' else 'Human-Authored'} based on linguistic patterns and sentiment analysis.",
"avg_polarity": model_sentiment.get("avg_polarity", 0),
"polarity_variance": polarity_variance
},
"topics": [
{
"topic": "General Content",
"relevance": 0.8,
"keywords": words[:5] if len(words) >= 5 else words
}
],
"writingStyle": {
"tone": tone,
"formality": formality,
"complexity": complexity,
"style": [formality, complexity, tone],
"audience": audience,
"sentiment_consistency": "very_low" if polarity_variance <= 0.10 else "low" if polarity_variance <= 0.35 else "moderate" if polarity_variance <= 0.60 else "high"
},
"insights": insights,
"plagiarismRisk": {
"score": int(ai_prob * 100) if is_ai else (int(ai_prob * 70) if is_mixed else 10),
"level": "high" if is_ai and ai_prob > 0.8 else "medium" if (is_ai or is_mixed) else "low",
"details": f"{'High' if is_ai else 'Moderate' if is_mixed else 'Low'} similarity to AI-generated patterns detected."
},
"contentQuality": {
"overall": int(85 - (ai_prob * 20)) if is_ai else (int(80 - (ai_prob * 15)) if is_mixed else 90),
"clarity": int(90 - (ai_prob * 10)) if not is_mixed else int(85 - (ai_prob * 8)),
"coherence": int(88 - (ai_prob * 8)) if not is_mixed else int(82 - (ai_prob * 6)),
"engagement": int(75 - (ai_prob * 25)) if not is_mixed else int(70 - (ai_prob * 20)),
"originality": int(60 - (ai_prob * 40)) if is_ai else (int(70 - (ai_prob * 30)) if is_mixed else 85)
},
"aiOrHuman": ai_detection["classification"],
"aiOrHumanConfidence": ai_detection["confidence"],
"aiOrHumanExplanation": ai_detection["explanation"],
"mixedAnalysis": mixed_analysis,
"probabilityBreakdown": probability_breakdown,
"rawChunks": ai_detection.get("raw_chunks", []),
"sentenceAnalysis": ai_detection.get("sentence_analysis", []),
"mergedSpans": ai_detection.get("merged_spans", []),
"modelProcessingTime": processing_seconds
}
def get_model_info(self) -> Dict[str, Any]:
"""Get information about the loaded models"""
return {
"model_loaded": self.model_loaded,
"model_path": self.model_path,
"device": str(self.device),
"max_length": self.max_length,
"architecture": "Two-Branch (DeBERTa + Sentiment Features)",
"primary_model": "DeBERTa-v3-large (desklib/ai-text-detector-v1.01)",
"sentiment_model": "DistilBERT-SST-2",
"classifier": "XGBoost" if self.xgboost_model is not None else "DeBERTa Linear",
"features": [
"DeBERTa embeddings (1024 dimensions)",
"Average sentiment polarity",
"Sentiment polarity variance"
],
"description": "Two-branch model for detecting AI-Generated vs Human-Authored text using DeBERTa semantic embeddings combined with sentiment features"
}
#