""" Model Handler for Two-Branch AI Detection Model Combines DeBERTa embeddings with sentiment features Uses XGBoost for final classification """ import torch import torch.nn as nn import torch.nn.functional as F from transformers import AutoTokenizer, AutoConfig, AutoModel, PreTrainedModel, AutoModelForSequenceClassification import os import logging import time from typing import Dict, Any, Optional, List, Tuple import numpy as np from pathlib import Path import xgboost as xgb import json import nltk from nltk.tokenize import sent_tokenize # Download NLTK data try: nltk.data.find('tokenizers/punkt') except LookupError: nltk.download('punkt', quiet=True) try: nltk.data.find('tokenizers/punkt_tab') except LookupError: nltk.download('punkt_tab', quiet=True) logger = logging.getLogger(__name__) class DesklibAIDetectionModel(PreTrainedModel): """ DeBERTa-based AI detection model Architecture from desklib/ai-text-detector-v1.01 """ config_class = AutoConfig def __init__(self, config): super().__init__(config) # Initialize the base transformer model self.model = AutoModel.from_config(config) # Define a classifier head self.classifier = nn.Linear(config.hidden_size, 1) # Initialize weights self.init_weights() def forward(self, input_ids, attention_mask=None, labels=None): # Forward pass through the transformer outputs = self.model(input_ids, attention_mask=attention_mask) last_hidden_state = outputs[0] # Mean pooling input_mask_expanded = attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float() sum_embeddings = torch.sum(last_hidden_state * input_mask_expanded, dim=1) sum_mask = torch.clamp(input_mask_expanded.sum(dim=1), min=1e-9) pooled_output = sum_embeddings / sum_mask # Classifier logits = self.classifier(pooled_output) loss = None if labels is not None: loss_fct = nn.BCEWithLogitsLoss() loss = loss_fct(logits.view(-1), labels.float()) output = {"logits": logits} if loss is not None: output["loss"] = loss return output class AIDetectionModelHandler: """ Handles Two-Branch AI detection: - DeBERTa for semantic embeddings - Sentiment features (avg_polarity, polarity_variance) - XGBoost for final classification """ def __init__(self, model_path: Optional[str] = None, max_length: int = 512): """ Initialize the model handler Args: model_path: Path to the model directory (default: ../model/model) max_length: Maximum token length for input text """ self.max_length = max_length self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.deberta_model = None self.tokenizer = None self.sentiment_model = None self.sentiment_tokenizer = None self.xgboost_model = None self.model_loaded = False # Default model paths if model_path is None: # Prefer explicit env var env_model_path = os.getenv("MODEL_PATH") if env_model_path and os.path.exists(env_model_path): model_path = env_model_path elif os.path.exists("/app/model"): model_path = "/app/model" else: # Fallback to legacy relative path backend_dir = Path(__file__).parent model_path = str(backend_dir.parent / "model" / "model") self.model_path = model_path # XGBoost file is expected inside the same folder as the other model artifacts self.xgboost_path = str(Path(model_path) / "xgboost_model.json") # Load the models self._load_models() def _load_models(self): """Load DeBERTa, sentiment model, and XGBoost classifier""" try: logger.info(f"CUDA available: {torch.cuda.is_available()}") logger.info(f"Selected device: {self.device}") logger.info(f"Loading models from: {self.model_path}") logger.info(f"Using device: {self.device}") # Check if model path exists if not os.path.exists(self.model_path): logger.error(f"Model path does not exist: {self.model_path}") raise FileNotFoundError(f"Model not found at {self.model_path}") # 1. Load DeBERTa tokenizer and model logger.info("Loading DeBERTa tokenizer...") self.tokenizer = AutoTokenizer.from_pretrained(self.model_path) logger.info("Loading DeBERTa model...") self.deberta_model = DesklibAIDetectionModel.from_pretrained(self.model_path) self.deberta_model.to(self.device) self.deberta_model.eval() print("DeBERTa model device:", next(self.deberta_model.parameters()).device) # 2. Load sentiment analysis model (DistilBERT) logger.info("Loading sentiment model...") sentiment_model_name = "distilbert-base-uncased-finetuned-sst-2-english" self.sentiment_tokenizer = AutoTokenizer.from_pretrained(sentiment_model_name) self.sentiment_model = AutoModelForSequenceClassification.from_pretrained(sentiment_model_name) self.sentiment_model.to(self.device) self.sentiment_model.eval() print("Sentiment model device:", next(self.sentiment_model.parameters()).device) # 3. Load XGBoost model if os.path.exists(self.xgboost_path): logger.info(f"Loading XGBoost model from: {self.xgboost_path}") t0 = time.perf_counter() self.xgboost_model = xgb.Booster() self.xgboost_model.load_model(self.xgboost_path) # Force GPU or CPU depending on hardware if torch.cuda.is_available(): logger.info("Setting XGBoost to use GPU predictor") try: self.xgboost_model.set_param({"predictor": "gpu_predictor", "tree_method": "gpu_hist"}) logger.info("XGBoost configured to use GPU (gpu_predictor, gpu_hist)") except Exception as ie: logger.warning(f"Failed to set XGBoost GPU params: {ie}") else: logger.info("Setting XGBoost to use CPU predictor") try: self.xgboost_model.set_param({"predictor": "cpu_predictor", "tree_method": "hist"}) except Exception as ie: logger.warning(f"Failed to set XGBoost CPU params: {ie}") t1 = time.perf_counter() logger.info(f"XGBoost model loaded in {t1 - t0:.4f}s") logger.info("✅ XGBoost model loaded!") else: logger.warning(f"XGBoost model not found at {self.xgboost_path}, using DeBERTa only") self.xgboost_model = None # 🔍 OPTIONAL: PRINT GPU NAME if torch.cuda.is_available(): print("GPU detected:", torch.cuda.get_device_name(0)) self.model_loaded = True logger.info("✅ All models loaded successfully!") except Exception as e: logger.error(f"Failed to load models: {e}", exc_info=True) self.model_loaded = False raise def is_loaded(self) -> bool: """Check if model is loaded""" return self.model_loaded def get_sentiment_scores(self, text: str) -> List[float]: """ Extract sentiment scores for each sentence using DistilBERT Args: text: Input text Returns: List of sentiment scores (polarity) for each sentence """ try: # Tokenize into sentences sentences = sent_tokenize(text) if not sentences: return [0.5] # Neutral if no sentences scores = [] start_total = time.perf_counter() with torch.no_grad(): for i, sentence in enumerate(sentences): s0 = time.perf_counter() # Tokenize sentence inputs = self.sentiment_tokenizer( sentence, return_tensors="pt", padding=True, truncation=True, max_length=512 ) inputs = {k: v.to(self.device) for k, v in inputs.items()} # Get sentiment prediction outputs = self.sentiment_model(**inputs) logits = outputs.logits probabilities = F.softmax(logits, dim=-1) # Get positive sentiment probability (index 1) pos_prob = probabilities[0][1].item() # Convert to polarity score (-1 to 1, where 0.5 is neutral) polarity = (pos_prob - 0.5) * 2 # Maps [0,1] to [-1,1] scores.append(polarity) s1 = time.perf_counter() logger.debug(f"Sentiment sentence processed in {s1 - s0:.4f}s") total_time = time.perf_counter() - start_total logger.info(f"Extracted sentiment scores for {len(sentences)} sentences in {total_time:.4f}s") return scores except Exception as e: logger.error(f"Error extracting sentiment scores: {e}") return [0.0] # Return neutral on error def extract_sentiment_features(self, text: str) -> np.ndarray: """ Extract avg_polarity and polarity_variance from text Args: text: Input text Returns: Numpy array with [avg_polarity, polarity_variance] """ start = time.perf_counter() sentiment_scores = self.get_sentiment_scores(text) # Calculate features avg_polarity = float(np.mean(sentiment_scores)) if sentiment_scores else 0.0 polarity_variance = float(np.var(sentiment_scores)) if len(sentiment_scores) > 1 else 0.0 duration = time.perf_counter() - start logger.info(f"Sentiment features extracted in {duration:.4f}s (avg_polarity={avg_polarity:.4f}, variance={polarity_variance:.4f})") return np.array([avg_polarity, polarity_variance], dtype=np.float32) def get_deberta_embeddings(self, text: str) -> np.ndarray: """ Get DeBERTa embeddings for text using mean pooling Args: text: Input text Returns: Numpy array of embeddings """ try: t_total = time.perf_counter() # Tokenize input t0 = time.perf_counter() encoded = self.tokenizer( text, padding='max_length', truncation=True, max_length=self.max_length, return_tensors='pt' ) t1 = time.perf_counter() logger.debug(f"Tokenization time: {t1 - t0:.4f}s") input_ids = encoded['input_ids'].to(self.device) attention_mask = encoded['attention_mask'].to(self.device) # Get embeddings with torch.no_grad(): t0 = time.perf_counter() outputs = self.deberta_model.model(input_ids=input_ids, attention_mask=attention_mask) t1 = time.perf_counter() logger.debug(f"Transformer forward pass time: {t1 - t0:.4f}s") last_hidden_state = outputs[0] # Mean pooling t0 = time.perf_counter() input_mask_expanded = attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float() sum_embeddings = torch.sum(last_hidden_state * input_mask_expanded, dim=1) sum_mask = torch.clamp(input_mask_expanded.sum(dim=1), min=1e-9) pooled_output = sum_embeddings / sum_mask t1 = time.perf_counter() logger.debug(f"Pooling time: {t1 - t0:.4f}s") # Convert to numpy embeddings = pooled_output.cpu().numpy().flatten() total = time.perf_counter() - t_total return embeddings except Exception as e: logger.error(f"Error extracting DeBERTa embeddings: {e}", exc_info=True) raise def predict_probability(self, text: str, threshold: float = 0.5) -> Dict[str, Any]: """ Predict if text is AI-generated using two-branch architecture Args: text: Input text to analyze threshold: Classification threshold (default: 0.5) Returns: Dictionary with probability, label, sentiment features """ if not self.model_loaded: raise RuntimeError("Model not loaded. Cannot perform prediction.") try: overall_start = time.perf_counter() # Extract sentiment features logger.info("Extracting sentiment features...") sentiment_start = time.perf_counter() sentiment_features = self.extract_sentiment_features(text) sentiment_time = time.perf_counter() - sentiment_start avg_polarity = float(sentiment_features[0]) polarity_variance = float(sentiment_features[1]) logger.info(f"Sentiment extraction took {sentiment_time:.4f}s") # If XGBoost is available, use the full two-branch pipeline if self.xgboost_model is not None: logger.info("Using XGBoost two-branch model...") embed_start = time.perf_counter() # Get DeBERTa embeddings deberta_embeddings = self.get_deberta_embeddings(text) embed_time = time.perf_counter() - embed_start logger.info(f"DeBERTa embedding extraction took {embed_time:.4f}s") # Combine features: DeBERTa embeddings + sentiment features combined_features = np.concatenate([deberta_embeddings, sentiment_features]) # Create DMatrix for XGBoost dmatrix = xgb.DMatrix(combined_features.reshape(1, -1)) # Predict xgb_start = time.perf_counter() probability = float(self.xgboost_model.predict(dmatrix)[0]) xgb_time = time.perf_counter() - xgb_start logger.info(f"XGBoost prediction took {xgb_time:.4f}s") else: # Fallback to DeBERTa only logger.info("Using DeBERTa model only (XGBoost not found)...") encoded = self.tokenizer( text, padding='max_length', truncation=True, max_length=self.max_length, return_tensors='pt' ) input_ids = encoded['input_ids'].to(self.device) attention_mask = encoded['attention_mask'].to(self.device) with torch.no_grad(): t0 = time.perf_counter() outputs = self.deberta_model(input_ids=input_ids, attention_mask=attention_mask) t1 = time.perf_counter() logger.info(f"DeBERTa forward & classification took {t1 - t0:.4f}s") logits = outputs["logits"] probability = torch.sigmoid(logits).item() label = 1 if probability >= threshold else 0 overall_time = time.perf_counter() - overall_start logger.info(f"Total prediction pipeline took {overall_time:.4f}s (prob={probability:.4f})") return { "probability": probability, "label": label, "classification": "ai" if label == 1 else "human", "confidence": probability if label == 1 else (1 - probability), "sentiment_features": { "avg_polarity": avg_polarity, "polarity_variance": polarity_variance } } except Exception as e: logger.error(f"Prediction error: {e}", exc_info=True) raise def predict_single_text_xgboost(self, text: str) -> Tuple[float, int]: """ Predict AI probability and label for a single text using XGBoost model Args: text: Input text to analyze Returns: Tuple of (probability, label) where label is 0 for human, 1 for AI """ try: start_total = time.perf_counter() # Extract sentiment features sentiment_features = self.extract_sentiment_features(text) avg_polarity = float(sentiment_features[0]) polarity_variance = float(sentiment_features[1]) # If XGBoost is available, use the full two-branch pipeline if self.xgboost_model is not None: embed_start = time.perf_counter() # Get DeBERTa embeddings deberta_embeddings = self.get_deberta_embeddings(text) embed_time = time.perf_counter() - embed_start logger.info(f"DeBERTa embedding extraction took {embed_time:.4f}s") # Combine features: DeBERTa embeddings + sentiment features combined_features = np.concatenate([deberta_embeddings, sentiment_features]) # Create DMatrix for XGBoost dmatrix = xgb.DMatrix(combined_features.reshape(1, -1)) xgb_start = time.perf_counter() # Predict probability = float(self.xgboost_model.predict(dmatrix)[0]) xgb_time = time.perf_counter() - xgb_start logger.info(f"XGBoost prediction (single) took {xgb_time:.4f}s") else: # Fallback to DeBERTa only encoded = self.tokenizer( text, padding='max_length', truncation=True, max_length=self.max_length, return_tensors='pt' ) input_ids = encoded['input_ids'].to(self.device) attention_mask = encoded['attention_mask'].to(self.device) with torch.no_grad(): t0 = time.perf_counter() outputs = self.deberta_model(input_ids=input_ids, attention_mask=attention_mask) t1 = time.perf_counter() logger.info(f"DeBERTa forward (single) took {t1 - t0:.4f}s") logits = outputs["logits"] probability = torch.sigmoid(logits).item() label = 1 if probability >= 0.5 else 0 total = time.perf_counter() - start_total logger.info(f"predict_single_text_xgboost total time: {total:.4f}s") return probability, label except Exception as e: logger.error(f"Single text prediction error: {e}", exc_info=True) raise def detect_mixed_text_chunk_based(self, text: str, chunk_size: int = 4, overlap: int = 1, min_chunk_length: int = 50) -> Dict[str, Any]: """ Improved mixed text detection using chunk-based analysis that influences overall probability Args: text: Input text string chunk_size: Number of sentences per chunk (default: 4) overlap: Number of sentences to overlap between chunks (default: 1) min_chunk_length: Minimum character length for a chunk to be analyzed Returns: Dictionary with prediction results and analysis details Note: Input validation: Text must be 80-2000 words. Dynamic chunking: 4-5 sentences analyzed as whole, then chunk size varies: - 6-10 sentences: 3 sentences per chunk - 11-20 sentences: 4 sentences per chunk - 21-30 sentences: 5 sentences per chunk - 31+ sentences: 6 sentences per chunk Uses overlapping chunks to capture transitions between AI and human content. """ # Get overall prediction (your current method) overall_prob, overall_label = self.predict_single_text_xgboost(text) # Split text into sentences sentences = sent_tokenize(text) # Validate input text length (80-2000 words) total_words = len(text.split()) if total_words < 80: return { 'prediction': 'Human' if overall_label == 0 else 'AI', 'confidence': abs(overall_prob - 0.5) * 2, 'is_mixed': False, 'reason': f'Text too short for analysis ({total_words} words, minimum 80 words required)', 'overall_probability': overall_prob, 'modified_probability': overall_prob, 'chunk_analysis': [] } elif total_words > 2000: return { 'prediction': 'Human' if overall_label == 0 else 'AI', 'confidence': abs(overall_prob - 0.5) * 2, 'is_mixed': False, 'reason': f'Text too long for analysis ({total_words} words, maximum 2000 words allowed)', 'overall_probability': overall_prob, 'modified_probability': overall_prob, 'chunk_analysis': [] } # Compute sentence character offsets (start/end) to map back to original text sentence_offsets: List[Tuple[int, int]] = [] search_start = 0 for sent in sentences: # find the sentence occurrence starting from search_start idx = text.find(sent, search_start) if idx == -1: # fallback: skip whitespace and set to previous end idx = search_start start_char = idx end_char = start_char + len(sent) sentence_offsets.append((start_char, end_char)) search_start = end_char # Dynamic chunking based on total sentence count total_sentences = len(sentences) # For 4-5 sentences, analyze as whole (no chunking) if total_sentences <= 5: return { 'prediction': 'Human' if overall_label == 0 else 'AI', 'confidence': abs(overall_prob - 0.5) * 2, 'is_mixed': False, 'reason': f'Analyzing {total_sentences} sentences as whole (4-5 sentence range)', 'overall_probability': overall_prob, 'modified_probability': overall_prob, 'chunk_analysis': [] } # Dynamic chunk size based on total sentences if total_sentences <= 10: dynamic_chunk_size = 3 elif total_sentences <= 20: dynamic_chunk_size = 4 elif total_sentences <= 30: dynamic_chunk_size = 5 else: dynamic_chunk_size = 6 # For very long texts # Ensure we have enough sentences for at least 2 chunks if total_sentences < dynamic_chunk_size * 2: return { 'prediction': 'Human' if overall_label == 0 else 'AI', 'confidence': abs(overall_prob - 0.5) * 2, 'is_mixed': False, 'reason': f'Text too short for chunk analysis ({total_sentences} sentences, need at least {dynamic_chunk_size * 2})', 'overall_probability': overall_prob, 'modified_probability': overall_prob, 'chunk_analysis': [] } # Create overlapping chunks and retain sentence index ranges chunks = [] # textual chunks (for backward compat) chunk_sentence_ranges: List[Tuple[int, int]] = [] # inclusive start, inclusive end sentence idx chunk_predictions: List[Tuple[float, int]] = [] chunk_probabilities: List[float] = [] logger.info(f"Analyzing text with {total_sentences} sentences using dynamic chunk size of {dynamic_chunk_size}...") for i in range(0, len(sentences) - dynamic_chunk_size + 1, dynamic_chunk_size - overlap): # Create chunk from sentences start_idx = i end_idx = i + dynamic_chunk_size - 1 chunk_sentences = sentences[start_idx:end_idx + 1] chunk_text = ' '.join(chunk_sentences) # Only analyze chunks that meet minimum length requirement if len(chunk_text.strip()) >= min_chunk_length: chunks.append(chunk_text) chunk_sentence_ranges.append((start_idx, end_idx)) # Analyze this chunk prob, label = self.predict_single_text_xgboost(chunk_text) chunk_predictions.append((prob, label)) chunk_probabilities.append(prob) logger.info(f" Chunk {len(chunks)}: {chunk_text[:60]}... → {'AI' if label == 1 else 'Human'} ({prob:.3f})") if len(chunk_predictions) < 2: return { 'prediction': 'Human' if overall_label == 0 else 'AI', 'confidence': abs(overall_prob - 0.5) * 2, 'is_mixed': False, 'reason': 'Too few chunks for mixed analysis', 'overall_probability': overall_prob, 'modified_probability': overall_prob, 'chunk_probabilities': chunk_probabilities, 'raw_chunks': [], 'sentence_analysis': [], 'merged_spans': [], 'chunk_analysis': chunk_predictions } # Count human vs AI chunks human_chunks = sum(1 for _, label in chunk_predictions if label == 0) ai_chunks = sum(1 for _, label in chunk_predictions if label == 1) total_chunks = len(chunk_predictions) # Mixed text detection logic is_mixed = human_chunks > 0 and ai_chunks > 0 mixed_ratio = min(human_chunks, ai_chunks) / total_chunks chunk_avg_prob = float(np.mean(chunk_probabilities)) if chunk_probabilities else overall_prob chunk_label = 'AI' if chunk_avg_prob >= 0.5 else 'Human' logger.info(f"\nChunk Analysis Summary:") logger.info(f" Total chunks analyzed: {total_chunks}") logger.info(f" Human chunks: {human_chunks}") logger.info(f" AI chunks: {ai_chunks}") logger.info(f" Mixed ratio: {mixed_ratio:.2f}") logger.info(f" Average chunk probability: {chunk_avg_prob:.3f}") logger.info(f" Chunk-derived label: {chunk_label}") if is_mixed: final_prediction = 'Mixed' modified_prob = chunk_avg_prob confidence = 1.0 - mixed_ratio logger.info(" → MIXED TEXT DETECTED (chunk-based)") else: final_prediction = chunk_label modified_prob = chunk_avg_prob confidence = abs(chunk_avg_prob - 0.5) * 2 logger.info(f" → Pure {chunk_label} text based on chunk probabilities") # Build detailed raw_chunks with character offsets raw_chunks: List[Dict[str, Any]] = [] for idx, ((prob, label), (sent_start, sent_end)) in enumerate(zip(chunk_predictions, chunk_sentence_ranges)): # Map sentence indices to char offsets start_char = sentence_offsets[sent_start][0] if sent_start < len(sentence_offsets) else 0 end_char = sentence_offsets[sent_end][1] if sent_end < len(sentence_offsets) else len(text) chunk_text = text[start_char:end_char] raw_chunks.append({ 'chunk_index': idx, 'start_char': start_char, 'end_char': end_char, 'text': chunk_text, 'probability': float(prob), 'label': 'ai' if label == 1 else 'human', 'sentence_range': [sent_start, sent_end] }) # Compute per-sentence aggregated probabilities and labels (weighted by chunk presence) sentence_analysis: List[Dict[str, Any]] = [] for si in range(len(sentences)): # Find chunks covering this sentence covering_probs: List[float] = [] covering_labels: List[int] = [] for (prob, label), (cs, ce) in zip(chunk_predictions, chunk_sentence_ranges): if cs <= si <= ce: covering_probs.append(prob) covering_labels.append(label) if covering_probs: avg_prob = float(np.mean(covering_probs)) # Use weighted/average probability as primary signal, but also # consider chunk label majority with a safety threshold. # Tighten AI labeling by requiring a higher probability threshold # to reduce false positives from noisy chunks. label_frac = float(np.mean(covering_labels)) if covering_labels else 0.0 AI_PROB_THRESHOLD = 0.55 # If average probability is confidently AI, mark as AI. if avg_prob >= AI_PROB_THRESHOLD: sentence_label = 'ai' # Otherwise, if majority of covering chunks are labeled AI and # probability is at least 0.5, mark as AI (minority case). elif label_frac > 0.5 and avg_prob >= 0.5: sentence_label = 'ai' else: sentence_label = 'human' else: # No covering chunks: use nearest-chunk fallback (prefer previous chunk, # otherwise next chunk). This avoids falling back to the global overall_prob # which can make trailing sentences inherit the global label. nearest_prob = None nearest_label = None # find previous chunk index (the last chunk that ends before this sentence) prev_idx = None for idx, (cs, ce) in enumerate(chunk_sentence_ranges): if ce < si: prev_idx = idx if prev_idx is not None: nearest_prob, nearest_label = chunk_predictions[prev_idx] else: # find next chunk index (the first chunk that starts after this sentence) next_idx = None for idx, (cs, ce) in enumerate(chunk_sentence_ranges): if cs > si: next_idx = idx break if next_idx is not None: nearest_prob, nearest_label = chunk_predictions[next_idx] if nearest_prob is not None: avg_prob = float(nearest_prob) sentence_label = 'ai' if nearest_label == 1 else 'human' else: # Fallback to overall prediction if there are truly no chunks avg_prob = overall_prob sentence_label = 'ai' if overall_label == 1 else 'human' start_c, end_c = sentence_offsets[si] if si < len(sentence_offsets) else (0, 0) sentence_analysis.append({ 'sentence_index': si, 'start_char': start_c, 'end_char': end_c, 'text': sentences[si], 'avg_probability': avg_prob, 'label': sentence_label }) # Merge adjacent sentences with same label into non-overlapping spans for easy frontend rendering merged_spans: List[Dict[str, Any]] = [] if sentence_analysis: cur = sentence_analysis[0] cur_start = cur['start_char'] cur_end = cur['end_char'] cur_label = cur['label'] cur_probs = [cur['avg_probability']] for s in sentence_analysis[1:]: if s['label'] == cur_label: # extend current span cur_end = s['end_char'] cur_probs.append(s['avg_probability']) else: merged_spans.append({ 'start_char': cur_start, 'end_char': cur_end, 'label': cur_label, 'avg_probability': float(np.mean(cur_probs)) }) # start a new span cur_start = s['start_char'] cur_end = s['end_char'] cur_label = s['label'] cur_probs = [s['avg_probability']] # append final span merged_spans.append({ 'start_char': cur_start, 'end_char': cur_end, 'label': cur_label, 'avg_probability': float(np.mean(cur_probs)) }) return { 'prediction': final_prediction, 'confidence': confidence, 'is_mixed': is_mixed, 'mixed_ratio': mixed_ratio, 'human_chunks': human_chunks, 'ai_chunks': ai_chunks, 'total_chunks': total_chunks, 'overall_probability': overall_prob, 'modified_probability': modified_prob, 'chunk_probabilities': chunk_probabilities, 'chunk_analysis': chunk_predictions, 'raw_chunks': raw_chunks, 'sentence_analysis': sentence_analysis, 'merged_spans': merged_spans, 'chunk_size': chunk_size, 'overlap': overlap } def detect_ai(self, text: str) -> Dict[str, Any]: """ AI detection with chunk-based mixed text analysis Args: text: Input text Returns: Detection results with sentiment features and mixed text analysis """ # Use chunk-based detection for better mixed text handling chunk_result = self.detect_mixed_text_chunk_based(text) # Get sentiment features for explanation sentiment_features = self.extract_sentiment_features(text) avg_pol = float(sentiment_features[0]) pol_var = float(sentiment_features[1]) # Generate explanation based on prediction type confidence_pct = chunk_result["confidence"] * 100 prediction = chunk_result["prediction"] if confidence_pct > 90: certainty = "very high confidence" elif confidence_pct > 75: certainty = "high confidence" elif confidence_pct > 60: certainty = "moderate confidence" else: certainty = "low confidence" # Generate explanation based on prediction type if prediction == "Mixed": explanation = f"This text appears to be a mixture of AI-generated and human-authored text." explanation += " This mixed composition suggests the text may have been collaboratively written or heavily edited." # Add sentiment insights for mixed text if pol_var > 0.60: explanation += " High emotional variation across sections indicates significant style differences between parts." elif pol_var >= 0.36: explanation += " Moderate emotional variation suggests different writing styles in various sections." else: explanation += " Low emotional variation may indicate consistent editing or similar writing styles throughout." elif prediction == "AI": explanation = f"This text is classified as AI-Generated with {certainty}." explanation += " The text is typically associated with AI-generated writing based on patterns, including uniform structure or predictable phrasing." if pol_var <= 0.10: explanation += " Very low emotional variation which is common in more structured or machine-generated texts." elif pol_var <= 0.35: explanation += " Low emotional variation which may align with AI patterns but can also occur in formal human writing." elif pol_var <= 0.60: explanation += " Moderate emotional variation which is less typical for AI but still possible depending on the prompt or model." else: explanation += " High emotional variation which is uncommon in AI outputs but may occur in certain complex or narrative prompts." else: # Human explanation = f"This text is classified as Human-Authored with {certainty}." explanation += " The text shows patterns frequently observed in human writing, such as natural variations and flexible sentence structures." if pol_var > 0.60: explanation += " High emotional variation which often reflects expressive or opinionated writing." elif pol_var >= 0.36: explanation += " Moderate emotional variation which shows natural shifts in tone." elif pol_var >= 0.11: explanation += " Low emotional variation which may indicate formal or academic writing." else: explanation += " Very low emotional variation indicates consistent tone with focused perspective." # Convert prediction to classification format for backward compatibility classification_map = {"AI": "ai", "Human": "human", "Mixed": "mixed"} classification = classification_map.get(prediction, "unknown") return { "classification": classification, "prediction": prediction, "probability": chunk_result["modified_probability"], "confidence": confidence_pct, "explanation": explanation, "sentiment_features": { "avg_polarity": avg_pol, "polarity_variance": pol_var }, "mixed_analysis": { "is_mixed": chunk_result["is_mixed"], "mixed_ratio": chunk_result.get("mixed_ratio", 0), "human_chunks": chunk_result.get("human_chunks", 0), "ai_chunks": chunk_result.get("ai_chunks", 0), "total_chunks": chunk_result.get("total_chunks", 0), "overall_probability": chunk_result["overall_probability"], "modified_probability": chunk_result["modified_probability"] }, "raw_chunks": chunk_result.get("raw_chunks", []), "sentence_analysis": chunk_result.get("sentence_analysis", []), "merged_spans": chunk_result.get("merged_spans", []), "modelProcessingTime": time.perf_counter() } def analyze_text(self, text: str) -> Dict[str, Any]: start_time = time.perf_counter() """ Comprehensive text analysis combining AI detection with sentiment features Args: text: Input text to analyze Returns: Complete analysis results with model-based sentiment features """ # Validate input text length (80-2000 words) total_words = len(text.split()) if total_words < 80: raise ValueError(f"Text too short for analysis ({total_words} words, minimum 80 words required)") elif total_words > 2000: raise ValueError(f"Text too long for analysis ({total_words} words, maximum 2000 words allowed)") # Get AI detection results (includes sentiment features from model) ai_detection = self.detect_ai(text) mixed_analysis = ai_detection.get("mixed_analysis") or {} modified_prob = mixed_analysis.get("modified_probability") overall_prob = mixed_analysis.get("overall_probability") primary_probability = None for candidate in (modified_prob, overall_prob, ai_detection.get("probability")): if isinstance(candidate, (int, float)): primary_probability = float(candidate) break if primary_probability is None: primary_probability = 0.0 ai_prob = max(0.0, min(1.0, primary_probability)) human_prob = 1.0 - ai_prob probability_breakdown = { "ai": ai_prob, "human": human_prob } model_sentiment = ai_detection.get("sentiment_features", {}) # Perform basic text analysis words = text.split() sentences = [s.strip() for s in text.replace('!', '.').replace('?', '.').split('.') if s.strip()] # Calculate basic metrics word_count = len(words) sentence_count = len(sentences) avg_word_length = np.mean([len(w) for w in words]) if words else 0 avg_sentence_length = word_count / sentence_count if sentence_count > 0 else 0 # Determine complexity based on AI probability and text metrics is_ai = ai_detection["classification"] == "ai" is_mixed = ai_detection["classification"] == "mixed" # Handle different prediction types if is_mixed: formality = "mixed" complexity = "variable" tone = "Mixed (AI/Human)" audience = "Variable" elif is_ai: formality = "formal" if ai_prob > 0.7 else "neutral" complexity = "complex" if avg_word_length > 6 else "moderate" tone = "Professional" audience = "General to Academic" else: formality = "casual" if avg_word_length < 5 else "neutral" complexity = "simple" if avg_sentence_length < 15 else "moderate" tone = "Conversational" audience = "General Public" # Generate insights based on detection results insights = [] if is_mixed and ai_detection["confidence"] > 60: insights.append({ "type": "observation", "title": "Mixed Content Detected", "description": f"This text contains both AI-generated and human-authored sections ({ai_detection['confidence']:.1f}% confidence).", "suggestion": "Consider reviewing the text for consistency and ensuring all sections align with your intended voice and style." }) insights.append({ "type": "observation", "title": "Content Composition", "description": f"Analysis found {mixed_analysis.get('human_chunks', 0)} human-like sections and {mixed_analysis.get('ai_chunks', 0)} AI-like sections.", "suggestion": "The mixed nature suggests collaborative writing or heavy editing. Consider standardizing the writing style throughout." }) elif is_ai and ai_detection["confidence"] > 75: insights.append({ "type": "observation", "title": "AI-Generated Content Detected", "description": f"This text shows strong indicators associated with AI-generated writing ({ai_detection['confidence']:.1f}% confidence).", "suggestion": "Consider adding personal insights, varied sentence structures, or unique perspectives to achieve a more unique voice." }) elif not is_ai and ai_detection["confidence"] > 75: insights.append({ "type": "strength", "title": "Human Writing Characteristics", "description": f"The text shows several features commonly found in human-authored writing ({ai_detection['confidence']:.1f}% confidence)." }) # Sentence variety analysis if sentence_count > 2: sentence_lengths = [len(s.split()) for s in sentences] std_dev = np.std(sentence_lengths) if std_dev < 3: insights.append({ "type": "improvement", "title": "Sentence Variety", "description": "Sentences have similar lengths, which this pattern may indicate AI generation.", "suggestion": "Consider varying sentence length to create a more natural flow." }) else: insights.append({ "type": "strength", "title": "Good Sentence Variety", "description": "Text shows natural variation in sentence structure." }) # Generate emotions based on model sentiment polarity (data-driven ranges) avg_polarity = model_sentiment.get("avg_polarity", 0) emotions = [] if avg_polarity >= 0.71: emotions.append({"emotion": "very_positive", "score": min(abs(avg_polarity), 1.0), "intensity": "high"}) elif avg_polarity >= 0.30: emotions.append({"emotion": "positive", "score": min(abs(avg_polarity), 1.0), "intensity": "medium"}) elif avg_polarity >= -0.29: emotions.append({"emotion": "neutral", "score": 0.8, "intensity": "medium"}) else: emotions.append({"emotion": "negative", "score": min(abs(avg_polarity), 1.0), "intensity": "high"}) # Construct full analysis response with model sentiment features polarity_variance = model_sentiment.get("polarity_variance", 0) end_time = time.perf_counter() processing_seconds = round(end_time - start_time, 3) # exact seconds (millisecond precision) logger.info(f"Model processing time for analyze_text: {processing_seconds:.3f}s") return { "advancedSentiment": { "emotions": emotions, "confidence": 70 + (ai_detection["confidence"] * 0.3), "context": f"The text appears to be {'AI-Generated' if ai_detection['classification'] == 'ai' else 'Human-Authored'} based on linguistic patterns and sentiment analysis.", "avg_polarity": model_sentiment.get("avg_polarity", 0), "polarity_variance": polarity_variance }, "topics": [ { "topic": "General Content", "relevance": 0.8, "keywords": words[:5] if len(words) >= 5 else words } ], "writingStyle": { "tone": tone, "formality": formality, "complexity": complexity, "style": [formality, complexity, tone], "audience": audience, "sentiment_consistency": "very_low" if polarity_variance <= 0.10 else "low" if polarity_variance <= 0.35 else "moderate" if polarity_variance <= 0.60 else "high" }, "insights": insights, "plagiarismRisk": { "score": int(ai_prob * 100) if is_ai else (int(ai_prob * 70) if is_mixed else 10), "level": "high" if is_ai and ai_prob > 0.8 else "medium" if (is_ai or is_mixed) else "low", "details": f"{'High' if is_ai else 'Moderate' if is_mixed else 'Low'} similarity to AI-generated patterns detected." }, "contentQuality": { "overall": int(85 - (ai_prob * 20)) if is_ai else (int(80 - (ai_prob * 15)) if is_mixed else 90), "clarity": int(90 - (ai_prob * 10)) if not is_mixed else int(85 - (ai_prob * 8)), "coherence": int(88 - (ai_prob * 8)) if not is_mixed else int(82 - (ai_prob * 6)), "engagement": int(75 - (ai_prob * 25)) if not is_mixed else int(70 - (ai_prob * 20)), "originality": int(60 - (ai_prob * 40)) if is_ai else (int(70 - (ai_prob * 30)) if is_mixed else 85) }, "aiOrHuman": ai_detection["classification"], "aiOrHumanConfidence": ai_detection["confidence"], "aiOrHumanExplanation": ai_detection["explanation"], "mixedAnalysis": mixed_analysis, "probabilityBreakdown": probability_breakdown, "rawChunks": ai_detection.get("raw_chunks", []), "sentenceAnalysis": ai_detection.get("sentence_analysis", []), "mergedSpans": ai_detection.get("merged_spans", []), "modelProcessingTime": processing_seconds } def get_model_info(self) -> Dict[str, Any]: """Get information about the loaded models""" return { "model_loaded": self.model_loaded, "model_path": self.model_path, "device": str(self.device), "max_length": self.max_length, "architecture": "Two-Branch (DeBERTa + Sentiment Features)", "primary_model": "DeBERTa-v3-large (desklib/ai-text-detector-v1.01)", "sentiment_model": "DistilBERT-SST-2", "classifier": "XGBoost" if self.xgboost_model is not None else "DeBERTa Linear", "features": [ "DeBERTa embeddings (1024 dimensions)", "Average sentiment polarity", "Sentiment polarity variance" ], "description": "Two-branch model for detecting AI-Generated vs Human-Authored text using DeBERTa semantic embeddings combined with sentiment features" } #