Spaces:
Sleeping
Sleeping
| """ | |
| Model Handler for Two-Branch AI Detection Model | |
| Combines DeBERTa embeddings with sentiment features | |
| Uses XGBoost for final classification | |
| """ | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| from transformers import AutoTokenizer, AutoConfig, AutoModel, PreTrainedModel, AutoModelForSequenceClassification | |
| import os | |
| import logging | |
| import time | |
| from typing import Dict, Any, Optional, List, Tuple | |
| import numpy as np | |
| from pathlib import Path | |
| import xgboost as xgb | |
| import json | |
| import nltk | |
| from nltk.tokenize import sent_tokenize | |
| # Download NLTK data | |
| try: | |
| nltk.data.find('tokenizers/punkt') | |
| except LookupError: | |
| nltk.download('punkt', quiet=True) | |
| try: | |
| nltk.data.find('tokenizers/punkt_tab') | |
| except LookupError: | |
| nltk.download('punkt_tab', quiet=True) | |
| logger = logging.getLogger(__name__) | |
| class DesklibAIDetectionModel(PreTrainedModel): | |
| """ | |
| DeBERTa-based AI detection model | |
| Architecture from desklib/ai-text-detector-v1.01 | |
| """ | |
| config_class = AutoConfig | |
| def __init__(self, config): | |
| super().__init__(config) | |
| # Initialize the base transformer model | |
| self.model = AutoModel.from_config(config) | |
| # Define a classifier head | |
| self.classifier = nn.Linear(config.hidden_size, 1) | |
| # Initialize weights | |
| self.init_weights() | |
| def forward(self, input_ids, attention_mask=None, labels=None): | |
| # Forward pass through the transformer | |
| outputs = self.model(input_ids, attention_mask=attention_mask) | |
| last_hidden_state = outputs[0] | |
| # Mean pooling | |
| input_mask_expanded = attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float() | |
| sum_embeddings = torch.sum(last_hidden_state * input_mask_expanded, dim=1) | |
| sum_mask = torch.clamp(input_mask_expanded.sum(dim=1), min=1e-9) | |
| pooled_output = sum_embeddings / sum_mask | |
| # Classifier | |
| logits = self.classifier(pooled_output) | |
| loss = None | |
| if labels is not None: | |
| loss_fct = nn.BCEWithLogitsLoss() | |
| loss = loss_fct(logits.view(-1), labels.float()) | |
| output = {"logits": logits} | |
| if loss is not None: | |
| output["loss"] = loss | |
| return output | |
| class AIDetectionModelHandler: | |
| """ | |
| Handles Two-Branch AI detection: | |
| - DeBERTa for semantic embeddings | |
| - Sentiment features (avg_polarity, polarity_variance) | |
| - XGBoost for final classification | |
| """ | |
| def __init__(self, model_path: Optional[str] = None, max_length: int = 512): | |
| """ | |
| Initialize the model handler | |
| Args: | |
| model_path: Path to the model directory (default: ../model/model) | |
| max_length: Maximum token length for input text | |
| """ | |
| self.max_length = max_length | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.deberta_model = None | |
| self.tokenizer = None | |
| self.sentiment_model = None | |
| self.sentiment_tokenizer = None | |
| self.xgboost_model = None | |
| self.model_loaded = False | |
| # Default model paths | |
| if model_path is None: | |
| # Prefer explicit env var | |
| env_model_path = os.getenv("MODEL_PATH") | |
| if env_model_path and os.path.exists(env_model_path): | |
| model_path = env_model_path | |
| elif os.path.exists("/app/model"): | |
| model_path = "/app/model" | |
| else: | |
| # Fallback to legacy relative path | |
| backend_dir = Path(__file__).parent | |
| model_path = str(backend_dir.parent / "model" / "model") | |
| self.model_path = model_path | |
| # XGBoost file is expected inside the same folder as the other model artifacts | |
| self.xgboost_path = str(Path(model_path) / "xgboost_model.json") | |
| # Load the models | |
| self._load_models() | |
| def _load_models(self): | |
| """Load DeBERTa, sentiment model, and XGBoost classifier""" | |
| try: | |
| logger.info(f"CUDA available: {torch.cuda.is_available()}") | |
| logger.info(f"Selected device: {self.device}") | |
| logger.info(f"Loading models from: {self.model_path}") | |
| logger.info(f"Using device: {self.device}") | |
| # Check if model path exists | |
| if not os.path.exists(self.model_path): | |
| logger.error(f"Model path does not exist: {self.model_path}") | |
| raise FileNotFoundError(f"Model not found at {self.model_path}") | |
| # 1. Load DeBERTa tokenizer and model | |
| logger.info("Loading DeBERTa tokenizer...") | |
| self.tokenizer = AutoTokenizer.from_pretrained(self.model_path) | |
| logger.info("Loading DeBERTa model...") | |
| self.deberta_model = DesklibAIDetectionModel.from_pretrained(self.model_path) | |
| self.deberta_model.to(self.device) | |
| self.deberta_model.eval() | |
| print("DeBERTa model device:", next(self.deberta_model.parameters()).device) | |
| # 2. Load sentiment analysis model (DistilBERT) | |
| logger.info("Loading sentiment model...") | |
| sentiment_model_name = "distilbert-base-uncased-finetuned-sst-2-english" | |
| self.sentiment_tokenizer = AutoTokenizer.from_pretrained(sentiment_model_name) | |
| self.sentiment_model = AutoModelForSequenceClassification.from_pretrained(sentiment_model_name) | |
| self.sentiment_model.to(self.device) | |
| self.sentiment_model.eval() | |
| print("Sentiment model device:", next(self.sentiment_model.parameters()).device) | |
| # 3. Load XGBoost model | |
| if os.path.exists(self.xgboost_path): | |
| logger.info(f"Loading XGBoost model from: {self.xgboost_path}") | |
| t0 = time.perf_counter() | |
| self.xgboost_model = xgb.Booster() | |
| self.xgboost_model.load_model(self.xgboost_path) | |
| # Force GPU or CPU depending on hardware | |
| if torch.cuda.is_available(): | |
| logger.info("Setting XGBoost to use GPU predictor") | |
| try: | |
| self.xgboost_model.set_param({"predictor": "gpu_predictor", "tree_method": "gpu_hist"}) | |
| logger.info("XGBoost configured to use GPU (gpu_predictor, gpu_hist)") | |
| except Exception as ie: | |
| logger.warning(f"Failed to set XGBoost GPU params: {ie}") | |
| else: | |
| logger.info("Setting XGBoost to use CPU predictor") | |
| try: | |
| self.xgboost_model.set_param({"predictor": "cpu_predictor", "tree_method": "hist"}) | |
| except Exception as ie: | |
| logger.warning(f"Failed to set XGBoost CPU params: {ie}") | |
| t1 = time.perf_counter() | |
| logger.info(f"XGBoost model loaded in {t1 - t0:.4f}s") | |
| logger.info("✅ XGBoost model loaded!") | |
| else: | |
| logger.warning(f"XGBoost model not found at {self.xgboost_path}, using DeBERTa only") | |
| self.xgboost_model = None | |
| # 🔍 OPTIONAL: PRINT GPU NAME | |
| if torch.cuda.is_available(): | |
| print("GPU detected:", torch.cuda.get_device_name(0)) | |
| self.model_loaded = True | |
| logger.info("✅ All models loaded successfully!") | |
| except Exception as e: | |
| logger.error(f"Failed to load models: {e}", exc_info=True) | |
| self.model_loaded = False | |
| raise | |
| def is_loaded(self) -> bool: | |
| """Check if model is loaded""" | |
| return self.model_loaded | |
| def get_sentiment_scores(self, text: str) -> List[float]: | |
| """ | |
| Extract sentiment scores for each sentence using DistilBERT | |
| Args: | |
| text: Input text | |
| Returns: | |
| List of sentiment scores (polarity) for each sentence | |
| """ | |
| try: | |
| # Tokenize into sentences | |
| sentences = sent_tokenize(text) | |
| if not sentences: | |
| return [0.5] # Neutral if no sentences | |
| scores = [] | |
| start_total = time.perf_counter() | |
| with torch.no_grad(): | |
| for i, sentence in enumerate(sentences): | |
| s0 = time.perf_counter() | |
| # Tokenize sentence | |
| inputs = self.sentiment_tokenizer( | |
| sentence, | |
| return_tensors="pt", | |
| padding=True, | |
| truncation=True, | |
| max_length=512 | |
| ) | |
| inputs = {k: v.to(self.device) for k, v in inputs.items()} | |
| # Get sentiment prediction | |
| outputs = self.sentiment_model(**inputs) | |
| logits = outputs.logits | |
| probabilities = F.softmax(logits, dim=-1) | |
| # Get positive sentiment probability (index 1) | |
| pos_prob = probabilities[0][1].item() | |
| # Convert to polarity score (-1 to 1, where 0.5 is neutral) | |
| polarity = (pos_prob - 0.5) * 2 # Maps [0,1] to [-1,1] | |
| scores.append(polarity) | |
| s1 = time.perf_counter() | |
| logger.debug(f"Sentiment sentence processed in {s1 - s0:.4f}s") | |
| total_time = time.perf_counter() - start_total | |
| logger.info(f"Extracted sentiment scores for {len(sentences)} sentences in {total_time:.4f}s") | |
| return scores | |
| except Exception as e: | |
| logger.error(f"Error extracting sentiment scores: {e}") | |
| return [0.0] # Return neutral on error | |
| def extract_sentiment_features(self, text: str) -> np.ndarray: | |
| """ | |
| Extract avg_polarity and polarity_variance from text | |
| Args: | |
| text: Input text | |
| Returns: | |
| Numpy array with [avg_polarity, polarity_variance] | |
| """ | |
| start = time.perf_counter() | |
| sentiment_scores = self.get_sentiment_scores(text) | |
| # Calculate features | |
| avg_polarity = float(np.mean(sentiment_scores)) if sentiment_scores else 0.0 | |
| polarity_variance = float(np.var(sentiment_scores)) if len(sentiment_scores) > 1 else 0.0 | |
| duration = time.perf_counter() - start | |
| logger.info(f"Sentiment features extracted in {duration:.4f}s (avg_polarity={avg_polarity:.4f}, variance={polarity_variance:.4f})") | |
| return np.array([avg_polarity, polarity_variance], dtype=np.float32) | |
| def get_deberta_embeddings(self, text: str) -> np.ndarray: | |
| """ | |
| Get DeBERTa embeddings for text using mean pooling | |
| Args: | |
| text: Input text | |
| Returns: | |
| Numpy array of embeddings | |
| """ | |
| try: | |
| t_total = time.perf_counter() | |
| # Tokenize input | |
| t0 = time.perf_counter() | |
| encoded = self.tokenizer( | |
| text, | |
| padding='max_length', | |
| truncation=True, | |
| max_length=self.max_length, | |
| return_tensors='pt' | |
| ) | |
| t1 = time.perf_counter() | |
| logger.debug(f"Tokenization time: {t1 - t0:.4f}s") | |
| input_ids = encoded['input_ids'].to(self.device) | |
| attention_mask = encoded['attention_mask'].to(self.device) | |
| # Get embeddings | |
| with torch.no_grad(): | |
| t0 = time.perf_counter() | |
| outputs = self.deberta_model.model(input_ids=input_ids, attention_mask=attention_mask) | |
| t1 = time.perf_counter() | |
| logger.debug(f"Transformer forward pass time: {t1 - t0:.4f}s") | |
| last_hidden_state = outputs[0] | |
| # Mean pooling | |
| t0 = time.perf_counter() | |
| input_mask_expanded = attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float() | |
| sum_embeddings = torch.sum(last_hidden_state * input_mask_expanded, dim=1) | |
| sum_mask = torch.clamp(input_mask_expanded.sum(dim=1), min=1e-9) | |
| pooled_output = sum_embeddings / sum_mask | |
| t1 = time.perf_counter() | |
| logger.debug(f"Pooling time: {t1 - t0:.4f}s") | |
| # Convert to numpy | |
| embeddings = pooled_output.cpu().numpy().flatten() | |
| total = time.perf_counter() - t_total | |
| return embeddings | |
| except Exception as e: | |
| logger.error(f"Error extracting DeBERTa embeddings: {e}", exc_info=True) | |
| raise | |
| def predict_probability(self, text: str, threshold: float = 0.5) -> Dict[str, Any]: | |
| """ | |
| Predict if text is AI-generated using two-branch architecture | |
| Args: | |
| text: Input text to analyze | |
| threshold: Classification threshold (default: 0.5) | |
| Returns: | |
| Dictionary with probability, label, sentiment features | |
| """ | |
| if not self.model_loaded: | |
| raise RuntimeError("Model not loaded. Cannot perform prediction.") | |
| try: | |
| overall_start = time.perf_counter() | |
| # Extract sentiment features | |
| logger.info("Extracting sentiment features...") | |
| sentiment_start = time.perf_counter() | |
| sentiment_features = self.extract_sentiment_features(text) | |
| sentiment_time = time.perf_counter() - sentiment_start | |
| avg_polarity = float(sentiment_features[0]) | |
| polarity_variance = float(sentiment_features[1]) | |
| logger.info(f"Sentiment extraction took {sentiment_time:.4f}s") | |
| # If XGBoost is available, use the full two-branch pipeline | |
| if self.xgboost_model is not None: | |
| logger.info("Using XGBoost two-branch model...") | |
| embed_start = time.perf_counter() | |
| # Get DeBERTa embeddings | |
| deberta_embeddings = self.get_deberta_embeddings(text) | |
| embed_time = time.perf_counter() - embed_start | |
| logger.info(f"DeBERTa embedding extraction took {embed_time:.4f}s") | |
| # Combine features: DeBERTa embeddings + sentiment features | |
| combined_features = np.concatenate([deberta_embeddings, sentiment_features]) | |
| # Create DMatrix for XGBoost | |
| dmatrix = xgb.DMatrix(combined_features.reshape(1, -1)) | |
| # Predict | |
| xgb_start = time.perf_counter() | |
| probability = float(self.xgboost_model.predict(dmatrix)[0]) | |
| xgb_time = time.perf_counter() - xgb_start | |
| logger.info(f"XGBoost prediction took {xgb_time:.4f}s") | |
| else: | |
| # Fallback to DeBERTa only | |
| logger.info("Using DeBERTa model only (XGBoost not found)...") | |
| encoded = self.tokenizer( | |
| text, | |
| padding='max_length', | |
| truncation=True, | |
| max_length=self.max_length, | |
| return_tensors='pt' | |
| ) | |
| input_ids = encoded['input_ids'].to(self.device) | |
| attention_mask = encoded['attention_mask'].to(self.device) | |
| with torch.no_grad(): | |
| t0 = time.perf_counter() | |
| outputs = self.deberta_model(input_ids=input_ids, attention_mask=attention_mask) | |
| t1 = time.perf_counter() | |
| logger.info(f"DeBERTa forward & classification took {t1 - t0:.4f}s") | |
| logits = outputs["logits"] | |
| probability = torch.sigmoid(logits).item() | |
| label = 1 if probability >= threshold else 0 | |
| overall_time = time.perf_counter() - overall_start | |
| logger.info(f"Total prediction pipeline took {overall_time:.4f}s (prob={probability:.4f})") | |
| return { | |
| "probability": probability, | |
| "label": label, | |
| "classification": "ai" if label == 1 else "human", | |
| "confidence": probability if label == 1 else (1 - probability), | |
| "sentiment_features": { | |
| "avg_polarity": avg_polarity, | |
| "polarity_variance": polarity_variance | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"Prediction error: {e}", exc_info=True) | |
| raise | |
| def predict_single_text_xgboost(self, text: str) -> Tuple[float, int]: | |
| """ | |
| Predict AI probability and label for a single text using XGBoost model | |
| Args: | |
| text: Input text to analyze | |
| Returns: | |
| Tuple of (probability, label) where label is 0 for human, 1 for AI | |
| """ | |
| try: | |
| start_total = time.perf_counter() | |
| # Extract sentiment features | |
| sentiment_features = self.extract_sentiment_features(text) | |
| avg_polarity = float(sentiment_features[0]) | |
| polarity_variance = float(sentiment_features[1]) | |
| # If XGBoost is available, use the full two-branch pipeline | |
| if self.xgboost_model is not None: | |
| embed_start = time.perf_counter() | |
| # Get DeBERTa embeddings | |
| deberta_embeddings = self.get_deberta_embeddings(text) | |
| embed_time = time.perf_counter() - embed_start | |
| logger.info(f"DeBERTa embedding extraction took {embed_time:.4f}s") | |
| # Combine features: DeBERTa embeddings + sentiment features | |
| combined_features = np.concatenate([deberta_embeddings, sentiment_features]) | |
| # Create DMatrix for XGBoost | |
| dmatrix = xgb.DMatrix(combined_features.reshape(1, -1)) | |
| xgb_start = time.perf_counter() | |
| # Predict | |
| probability = float(self.xgboost_model.predict(dmatrix)[0]) | |
| xgb_time = time.perf_counter() - xgb_start | |
| logger.info(f"XGBoost prediction (single) took {xgb_time:.4f}s") | |
| else: | |
| # Fallback to DeBERTa only | |
| encoded = self.tokenizer( | |
| text, | |
| padding='max_length', | |
| truncation=True, | |
| max_length=self.max_length, | |
| return_tensors='pt' | |
| ) | |
| input_ids = encoded['input_ids'].to(self.device) | |
| attention_mask = encoded['attention_mask'].to(self.device) | |
| with torch.no_grad(): | |
| t0 = time.perf_counter() | |
| outputs = self.deberta_model(input_ids=input_ids, attention_mask=attention_mask) | |
| t1 = time.perf_counter() | |
| logger.info(f"DeBERTa forward (single) took {t1 - t0:.4f}s") | |
| logits = outputs["logits"] | |
| probability = torch.sigmoid(logits).item() | |
| label = 1 if probability >= 0.5 else 0 | |
| total = time.perf_counter() - start_total | |
| logger.info(f"predict_single_text_xgboost total time: {total:.4f}s") | |
| return probability, label | |
| except Exception as e: | |
| logger.error(f"Single text prediction error: {e}", exc_info=True) | |
| raise | |
| def detect_mixed_text_chunk_based(self, text: str, chunk_size: int = 4, overlap: int = 1, min_chunk_length: int = 50) -> Dict[str, Any]: | |
| """ | |
| Improved mixed text detection using chunk-based analysis that influences overall probability | |
| Args: | |
| text: Input text string | |
| chunk_size: Number of sentences per chunk (default: 4) | |
| overlap: Number of sentences to overlap between chunks (default: 1) | |
| min_chunk_length: Minimum character length for a chunk to be analyzed | |
| Returns: | |
| Dictionary with prediction results and analysis details | |
| Note: | |
| Input validation: Text must be 80-2000 words. Dynamic chunking: 4-5 sentences | |
| analyzed as whole, then chunk size varies: | |
| - 6-10 sentences: 3 sentences per chunk | |
| - 11-20 sentences: 4 sentences per chunk | |
| - 21-30 sentences: 5 sentences per chunk | |
| - 31+ sentences: 6 sentences per chunk | |
| Uses overlapping chunks to capture transitions between AI and human content. | |
| """ | |
| # Get overall prediction (your current method) | |
| overall_prob, overall_label = self.predict_single_text_xgboost(text) | |
| # Split text into sentences | |
| sentences = sent_tokenize(text) | |
| # Validate input text length (80-2000 words) | |
| total_words = len(text.split()) | |
| if total_words < 80: | |
| return { | |
| 'prediction': 'Human' if overall_label == 0 else 'AI', | |
| 'confidence': abs(overall_prob - 0.5) * 2, | |
| 'is_mixed': False, | |
| 'reason': f'Text too short for analysis ({total_words} words, minimum 80 words required)', | |
| 'overall_probability': overall_prob, | |
| 'modified_probability': overall_prob, | |
| 'chunk_analysis': [] | |
| } | |
| elif total_words > 2000: | |
| return { | |
| 'prediction': 'Human' if overall_label == 0 else 'AI', | |
| 'confidence': abs(overall_prob - 0.5) * 2, | |
| 'is_mixed': False, | |
| 'reason': f'Text too long for analysis ({total_words} words, maximum 2000 words allowed)', | |
| 'overall_probability': overall_prob, | |
| 'modified_probability': overall_prob, | |
| 'chunk_analysis': [] | |
| } | |
| # Compute sentence character offsets (start/end) to map back to original text | |
| sentence_offsets: List[Tuple[int, int]] = [] | |
| search_start = 0 | |
| for sent in sentences: | |
| # find the sentence occurrence starting from search_start | |
| idx = text.find(sent, search_start) | |
| if idx == -1: | |
| # fallback: skip whitespace and set to previous end | |
| idx = search_start | |
| start_char = idx | |
| end_char = start_char + len(sent) | |
| sentence_offsets.append((start_char, end_char)) | |
| search_start = end_char | |
| # Dynamic chunking based on total sentence count | |
| total_sentences = len(sentences) | |
| # For 4-5 sentences, analyze as whole (no chunking) | |
| if total_sentences <= 5: | |
| return { | |
| 'prediction': 'Human' if overall_label == 0 else 'AI', | |
| 'confidence': abs(overall_prob - 0.5) * 2, | |
| 'is_mixed': False, | |
| 'reason': f'Analyzing {total_sentences} sentences as whole (4-5 sentence range)', | |
| 'overall_probability': overall_prob, | |
| 'modified_probability': overall_prob, | |
| 'chunk_analysis': [] | |
| } | |
| # Dynamic chunk size based on total sentences | |
| if total_sentences <= 10: | |
| dynamic_chunk_size = 3 | |
| elif total_sentences <= 20: | |
| dynamic_chunk_size = 4 | |
| elif total_sentences <= 30: | |
| dynamic_chunk_size = 5 | |
| else: | |
| dynamic_chunk_size = 6 # For very long texts | |
| # Ensure we have enough sentences for at least 2 chunks | |
| if total_sentences < dynamic_chunk_size * 2: | |
| return { | |
| 'prediction': 'Human' if overall_label == 0 else 'AI', | |
| 'confidence': abs(overall_prob - 0.5) * 2, | |
| 'is_mixed': False, | |
| 'reason': f'Text too short for chunk analysis ({total_sentences} sentences, need at least {dynamic_chunk_size * 2})', | |
| 'overall_probability': overall_prob, | |
| 'modified_probability': overall_prob, | |
| 'chunk_analysis': [] | |
| } | |
| # Create overlapping chunks and retain sentence index ranges | |
| chunks = [] # textual chunks (for backward compat) | |
| chunk_sentence_ranges: List[Tuple[int, int]] = [] # inclusive start, inclusive end sentence idx | |
| chunk_predictions: List[Tuple[float, int]] = [] | |
| chunk_probabilities: List[float] = [] | |
| logger.info(f"Analyzing text with {total_sentences} sentences using dynamic chunk size of {dynamic_chunk_size}...") | |
| for i in range(0, len(sentences) - dynamic_chunk_size + 1, dynamic_chunk_size - overlap): | |
| # Create chunk from sentences | |
| start_idx = i | |
| end_idx = i + dynamic_chunk_size - 1 | |
| chunk_sentences = sentences[start_idx:end_idx + 1] | |
| chunk_text = ' '.join(chunk_sentences) | |
| # Only analyze chunks that meet minimum length requirement | |
| if len(chunk_text.strip()) >= min_chunk_length: | |
| chunks.append(chunk_text) | |
| chunk_sentence_ranges.append((start_idx, end_idx)) | |
| # Analyze this chunk | |
| prob, label = self.predict_single_text_xgboost(chunk_text) | |
| chunk_predictions.append((prob, label)) | |
| chunk_probabilities.append(prob) | |
| logger.info(f" Chunk {len(chunks)}: {chunk_text[:60]}... → {'AI' if label == 1 else 'Human'} ({prob:.3f})") | |
| if len(chunk_predictions) < 2: | |
| return { | |
| 'prediction': 'Human' if overall_label == 0 else 'AI', | |
| 'confidence': abs(overall_prob - 0.5) * 2, | |
| 'is_mixed': False, | |
| 'reason': 'Too few chunks for mixed analysis', | |
| 'overall_probability': overall_prob, | |
| 'modified_probability': overall_prob, | |
| 'chunk_probabilities': chunk_probabilities, | |
| 'raw_chunks': [], | |
| 'sentence_analysis': [], | |
| 'merged_spans': [], | |
| 'chunk_analysis': chunk_predictions | |
| } | |
| # Count human vs AI chunks | |
| human_chunks = sum(1 for _, label in chunk_predictions if label == 0) | |
| ai_chunks = sum(1 for _, label in chunk_predictions if label == 1) | |
| total_chunks = len(chunk_predictions) | |
| # Mixed text detection logic | |
| is_mixed = human_chunks > 0 and ai_chunks > 0 | |
| mixed_ratio = min(human_chunks, ai_chunks) / total_chunks | |
| chunk_avg_prob = float(np.mean(chunk_probabilities)) if chunk_probabilities else overall_prob | |
| chunk_label = 'AI' if chunk_avg_prob >= 0.5 else 'Human' | |
| logger.info(f"\nChunk Analysis Summary:") | |
| logger.info(f" Total chunks analyzed: {total_chunks}") | |
| logger.info(f" Human chunks: {human_chunks}") | |
| logger.info(f" AI chunks: {ai_chunks}") | |
| logger.info(f" Mixed ratio: {mixed_ratio:.2f}") | |
| logger.info(f" Average chunk probability: {chunk_avg_prob:.3f}") | |
| logger.info(f" Chunk-derived label: {chunk_label}") | |
| if is_mixed: | |
| final_prediction = 'Mixed' | |
| modified_prob = chunk_avg_prob | |
| confidence = 1.0 - mixed_ratio | |
| logger.info(" → MIXED TEXT DETECTED (chunk-based)") | |
| else: | |
| final_prediction = chunk_label | |
| modified_prob = chunk_avg_prob | |
| confidence = abs(chunk_avg_prob - 0.5) * 2 | |
| logger.info(f" → Pure {chunk_label} text based on chunk probabilities") | |
| # Build detailed raw_chunks with character offsets | |
| raw_chunks: List[Dict[str, Any]] = [] | |
| for idx, ((prob, label), (sent_start, sent_end)) in enumerate(zip(chunk_predictions, chunk_sentence_ranges)): | |
| # Map sentence indices to char offsets | |
| start_char = sentence_offsets[sent_start][0] if sent_start < len(sentence_offsets) else 0 | |
| end_char = sentence_offsets[sent_end][1] if sent_end < len(sentence_offsets) else len(text) | |
| chunk_text = text[start_char:end_char] | |
| raw_chunks.append({ | |
| 'chunk_index': idx, | |
| 'start_char': start_char, | |
| 'end_char': end_char, | |
| 'text': chunk_text, | |
| 'probability': float(prob), | |
| 'label': 'ai' if label == 1 else 'human', | |
| 'sentence_range': [sent_start, sent_end] | |
| }) | |
| # Compute per-sentence aggregated probabilities and labels (weighted by chunk presence) | |
| sentence_analysis: List[Dict[str, Any]] = [] | |
| for si in range(len(sentences)): | |
| # Find chunks covering this sentence | |
| covering_probs: List[float] = [] | |
| covering_labels: List[int] = [] | |
| for (prob, label), (cs, ce) in zip(chunk_predictions, chunk_sentence_ranges): | |
| if cs <= si <= ce: | |
| covering_probs.append(prob) | |
| covering_labels.append(label) | |
| if covering_probs: | |
| avg_prob = float(np.mean(covering_probs)) | |
| # Use weighted/average probability as primary signal, but also | |
| # consider chunk label majority with a safety threshold. | |
| # Tighten AI labeling by requiring a higher probability threshold | |
| # to reduce false positives from noisy chunks. | |
| label_frac = float(np.mean(covering_labels)) if covering_labels else 0.0 | |
| AI_PROB_THRESHOLD = 0.55 | |
| # If average probability is confidently AI, mark as AI. | |
| if avg_prob >= AI_PROB_THRESHOLD: | |
| sentence_label = 'ai' | |
| # Otherwise, if majority of covering chunks are labeled AI and | |
| # probability is at least 0.5, mark as AI (minority case). | |
| elif label_frac > 0.5 and avg_prob >= 0.5: | |
| sentence_label = 'ai' | |
| else: | |
| sentence_label = 'human' | |
| else: | |
| # No covering chunks: use nearest-chunk fallback (prefer previous chunk, | |
| # otherwise next chunk). This avoids falling back to the global overall_prob | |
| # which can make trailing sentences inherit the global label. | |
| nearest_prob = None | |
| nearest_label = None | |
| # find previous chunk index (the last chunk that ends before this sentence) | |
| prev_idx = None | |
| for idx, (cs, ce) in enumerate(chunk_sentence_ranges): | |
| if ce < si: | |
| prev_idx = idx | |
| if prev_idx is not None: | |
| nearest_prob, nearest_label = chunk_predictions[prev_idx] | |
| else: | |
| # find next chunk index (the first chunk that starts after this sentence) | |
| next_idx = None | |
| for idx, (cs, ce) in enumerate(chunk_sentence_ranges): | |
| if cs > si: | |
| next_idx = idx | |
| break | |
| if next_idx is not None: | |
| nearest_prob, nearest_label = chunk_predictions[next_idx] | |
| if nearest_prob is not None: | |
| avg_prob = float(nearest_prob) | |
| sentence_label = 'ai' if nearest_label == 1 else 'human' | |
| else: | |
| # Fallback to overall prediction if there are truly no chunks | |
| avg_prob = overall_prob | |
| sentence_label = 'ai' if overall_label == 1 else 'human' | |
| start_c, end_c = sentence_offsets[si] if si < len(sentence_offsets) else (0, 0) | |
| sentence_analysis.append({ | |
| 'sentence_index': si, | |
| 'start_char': start_c, | |
| 'end_char': end_c, | |
| 'text': sentences[si], | |
| 'avg_probability': avg_prob, | |
| 'label': sentence_label | |
| }) | |
| # Merge adjacent sentences with same label into non-overlapping spans for easy frontend rendering | |
| merged_spans: List[Dict[str, Any]] = [] | |
| if sentence_analysis: | |
| cur = sentence_analysis[0] | |
| cur_start = cur['start_char'] | |
| cur_end = cur['end_char'] | |
| cur_label = cur['label'] | |
| cur_probs = [cur['avg_probability']] | |
| for s in sentence_analysis[1:]: | |
| if s['label'] == cur_label: | |
| # extend current span | |
| cur_end = s['end_char'] | |
| cur_probs.append(s['avg_probability']) | |
| else: | |
| merged_spans.append({ | |
| 'start_char': cur_start, | |
| 'end_char': cur_end, | |
| 'label': cur_label, | |
| 'avg_probability': float(np.mean(cur_probs)) | |
| }) | |
| # start a new span | |
| cur_start = s['start_char'] | |
| cur_end = s['end_char'] | |
| cur_label = s['label'] | |
| cur_probs = [s['avg_probability']] | |
| # append final span | |
| merged_spans.append({ | |
| 'start_char': cur_start, | |
| 'end_char': cur_end, | |
| 'label': cur_label, | |
| 'avg_probability': float(np.mean(cur_probs)) | |
| }) | |
| return { | |
| 'prediction': final_prediction, | |
| 'confidence': confidence, | |
| 'is_mixed': is_mixed, | |
| 'mixed_ratio': mixed_ratio, | |
| 'human_chunks': human_chunks, | |
| 'ai_chunks': ai_chunks, | |
| 'total_chunks': total_chunks, | |
| 'overall_probability': overall_prob, | |
| 'modified_probability': modified_prob, | |
| 'chunk_probabilities': chunk_probabilities, | |
| 'chunk_analysis': chunk_predictions, | |
| 'raw_chunks': raw_chunks, | |
| 'sentence_analysis': sentence_analysis, | |
| 'merged_spans': merged_spans, | |
| 'chunk_size': chunk_size, | |
| 'overlap': overlap | |
| } | |
| def detect_ai(self, text: str) -> Dict[str, Any]: | |
| """ | |
| AI detection with chunk-based mixed text analysis | |
| Args: | |
| text: Input text | |
| Returns: | |
| Detection results with sentiment features and mixed text analysis | |
| """ | |
| # Use chunk-based detection for better mixed text handling | |
| chunk_result = self.detect_mixed_text_chunk_based(text) | |
| # Get sentiment features for explanation | |
| sentiment_features = self.extract_sentiment_features(text) | |
| avg_pol = float(sentiment_features[0]) | |
| pol_var = float(sentiment_features[1]) | |
| # Generate explanation based on prediction type | |
| confidence_pct = chunk_result["confidence"] * 100 | |
| prediction = chunk_result["prediction"] | |
| if confidence_pct > 90: | |
| certainty = "very high confidence" | |
| elif confidence_pct > 75: | |
| certainty = "high confidence" | |
| elif confidence_pct > 60: | |
| certainty = "moderate confidence" | |
| else: | |
| certainty = "low confidence" | |
| # Generate explanation based on prediction type | |
| if prediction == "Mixed": | |
| explanation = f"This text appears to be a mixture of AI-generated and human-authored text." | |
| explanation += " This mixed composition suggests the text may have been collaboratively written or heavily edited." | |
| # Add sentiment insights for mixed text | |
| if pol_var > 0.60: | |
| explanation += " High emotional variation across sections indicates significant style differences between parts." | |
| elif pol_var >= 0.36: | |
| explanation += " Moderate emotional variation suggests different writing styles in various sections." | |
| else: | |
| explanation += " Low emotional variation may indicate consistent editing or similar writing styles throughout." | |
| elif prediction == "AI": | |
| explanation = f"This text is classified as AI-Generated with {certainty}." | |
| explanation += " The text is typically associated with AI-generated writing based on patterns, including uniform structure or predictable phrasing." | |
| if pol_var <= 0.10: | |
| explanation += " Very low emotional variation which is common in more structured or machine-generated texts." | |
| elif pol_var <= 0.35: | |
| explanation += " Low emotional variation which may align with AI patterns but can also occur in formal human writing." | |
| elif pol_var <= 0.60: | |
| explanation += " Moderate emotional variation which is less typical for AI but still possible depending on the prompt or model." | |
| else: | |
| explanation += " High emotional variation which is uncommon in AI outputs but may occur in certain complex or narrative prompts." | |
| else: # Human | |
| explanation = f"This text is classified as Human-Authored with {certainty}." | |
| explanation += " The text shows patterns frequently observed in human writing, such as natural variations and flexible sentence structures." | |
| if pol_var > 0.60: | |
| explanation += " High emotional variation which often reflects expressive or opinionated writing." | |
| elif pol_var >= 0.36: | |
| explanation += " Moderate emotional variation which shows natural shifts in tone." | |
| elif pol_var >= 0.11: | |
| explanation += " Low emotional variation which may indicate formal or academic writing." | |
| else: | |
| explanation += " Very low emotional variation indicates consistent tone with focused perspective." | |
| # Convert prediction to classification format for backward compatibility | |
| classification_map = {"AI": "ai", "Human": "human", "Mixed": "mixed"} | |
| classification = classification_map.get(prediction, "unknown") | |
| return { | |
| "classification": classification, | |
| "prediction": prediction, | |
| "probability": chunk_result["modified_probability"], | |
| "confidence": confidence_pct, | |
| "explanation": explanation, | |
| "sentiment_features": { | |
| "avg_polarity": avg_pol, | |
| "polarity_variance": pol_var | |
| }, | |
| "mixed_analysis": { | |
| "is_mixed": chunk_result["is_mixed"], | |
| "mixed_ratio": chunk_result.get("mixed_ratio", 0), | |
| "human_chunks": chunk_result.get("human_chunks", 0), | |
| "ai_chunks": chunk_result.get("ai_chunks", 0), | |
| "total_chunks": chunk_result.get("total_chunks", 0), | |
| "overall_probability": chunk_result["overall_probability"], | |
| "modified_probability": chunk_result["modified_probability"] | |
| }, | |
| "raw_chunks": chunk_result.get("raw_chunks", []), | |
| "sentence_analysis": chunk_result.get("sentence_analysis", []), | |
| "merged_spans": chunk_result.get("merged_spans", []), | |
| "modelProcessingTime": time.perf_counter() | |
| } | |
| def analyze_text(self, text: str) -> Dict[str, Any]: | |
| start_time = time.perf_counter() | |
| """ | |
| Comprehensive text analysis combining AI detection with sentiment features | |
| Args: | |
| text: Input text to analyze | |
| Returns: | |
| Complete analysis results with model-based sentiment features | |
| """ | |
| # Validate input text length (80-2000 words) | |
| total_words = len(text.split()) | |
| if total_words < 80: | |
| raise ValueError(f"Text too short for analysis ({total_words} words, minimum 80 words required)") | |
| elif total_words > 2000: | |
| raise ValueError(f"Text too long for analysis ({total_words} words, maximum 2000 words allowed)") | |
| # Get AI detection results (includes sentiment features from model) | |
| ai_detection = self.detect_ai(text) | |
| mixed_analysis = ai_detection.get("mixed_analysis") or {} | |
| modified_prob = mixed_analysis.get("modified_probability") | |
| overall_prob = mixed_analysis.get("overall_probability") | |
| primary_probability = None | |
| for candidate in (modified_prob, overall_prob, ai_detection.get("probability")): | |
| if isinstance(candidate, (int, float)): | |
| primary_probability = float(candidate) | |
| break | |
| if primary_probability is None: | |
| primary_probability = 0.0 | |
| ai_prob = max(0.0, min(1.0, primary_probability)) | |
| human_prob = 1.0 - ai_prob | |
| probability_breakdown = { | |
| "ai": ai_prob, | |
| "human": human_prob | |
| } | |
| model_sentiment = ai_detection.get("sentiment_features", {}) | |
| # Perform basic text analysis | |
| words = text.split() | |
| sentences = [s.strip() for s in text.replace('!', '.').replace('?', '.').split('.') if s.strip()] | |
| # Calculate basic metrics | |
| word_count = len(words) | |
| sentence_count = len(sentences) | |
| avg_word_length = np.mean([len(w) for w in words]) if words else 0 | |
| avg_sentence_length = word_count / sentence_count if sentence_count > 0 else 0 | |
| # Determine complexity based on AI probability and text metrics | |
| is_ai = ai_detection["classification"] == "ai" | |
| is_mixed = ai_detection["classification"] == "mixed" | |
| # Handle different prediction types | |
| if is_mixed: | |
| formality = "mixed" | |
| complexity = "variable" | |
| tone = "Mixed (AI/Human)" | |
| audience = "Variable" | |
| elif is_ai: | |
| formality = "formal" if ai_prob > 0.7 else "neutral" | |
| complexity = "complex" if avg_word_length > 6 else "moderate" | |
| tone = "Professional" | |
| audience = "General to Academic" | |
| else: | |
| formality = "casual" if avg_word_length < 5 else "neutral" | |
| complexity = "simple" if avg_sentence_length < 15 else "moderate" | |
| tone = "Conversational" | |
| audience = "General Public" | |
| # Generate insights based on detection results | |
| insights = [] | |
| if is_mixed and ai_detection["confidence"] > 60: | |
| insights.append({ | |
| "type": "observation", | |
| "title": "Mixed Content Detected", | |
| "description": f"This text contains both AI-generated and human-authored sections ({ai_detection['confidence']:.1f}% confidence).", | |
| "suggestion": "Consider reviewing the text for consistency and ensuring all sections align with your intended voice and style." | |
| }) | |
| insights.append({ | |
| "type": "observation", | |
| "title": "Content Composition", | |
| "description": f"Analysis found {mixed_analysis.get('human_chunks', 0)} human-like sections and {mixed_analysis.get('ai_chunks', 0)} AI-like sections.", | |
| "suggestion": "The mixed nature suggests collaborative writing or heavy editing. Consider standardizing the writing style throughout." | |
| }) | |
| elif is_ai and ai_detection["confidence"] > 75: | |
| insights.append({ | |
| "type": "observation", | |
| "title": "AI-Generated Content Detected", | |
| "description": f"This text shows strong indicators associated with AI-generated writing ({ai_detection['confidence']:.1f}% confidence).", | |
| "suggestion": "Consider adding personal insights, varied sentence structures, or unique perspectives to achieve a more unique voice." | |
| }) | |
| elif not is_ai and ai_detection["confidence"] > 75: | |
| insights.append({ | |
| "type": "strength", | |
| "title": "Human Writing Characteristics", | |
| "description": f"The text shows several features commonly found in human-authored writing ({ai_detection['confidence']:.1f}% confidence)." | |
| }) | |
| # Sentence variety analysis | |
| if sentence_count > 2: | |
| sentence_lengths = [len(s.split()) for s in sentences] | |
| std_dev = np.std(sentence_lengths) | |
| if std_dev < 3: | |
| insights.append({ | |
| "type": "improvement", | |
| "title": "Sentence Variety", | |
| "description": "Sentences have similar lengths, which this pattern may indicate AI generation.", | |
| "suggestion": "Consider varying sentence length to create a more natural flow." | |
| }) | |
| else: | |
| insights.append({ | |
| "type": "strength", | |
| "title": "Good Sentence Variety", | |
| "description": "Text shows natural variation in sentence structure." | |
| }) | |
| # Generate emotions based on model sentiment polarity (data-driven ranges) | |
| avg_polarity = model_sentiment.get("avg_polarity", 0) | |
| emotions = [] | |
| if avg_polarity >= 0.71: | |
| emotions.append({"emotion": "very_positive", "score": min(abs(avg_polarity), 1.0), "intensity": "high"}) | |
| elif avg_polarity >= 0.30: | |
| emotions.append({"emotion": "positive", "score": min(abs(avg_polarity), 1.0), "intensity": "medium"}) | |
| elif avg_polarity >= -0.29: | |
| emotions.append({"emotion": "neutral", "score": 0.8, "intensity": "medium"}) | |
| else: | |
| emotions.append({"emotion": "negative", "score": min(abs(avg_polarity), 1.0), "intensity": "high"}) | |
| # Construct full analysis response with model sentiment features | |
| polarity_variance = model_sentiment.get("polarity_variance", 0) | |
| end_time = time.perf_counter() | |
| processing_seconds = round(end_time - start_time, 3) # exact seconds (millisecond precision) | |
| logger.info(f"Model processing time for analyze_text: {processing_seconds:.3f}s") | |
| return { | |
| "advancedSentiment": { | |
| "emotions": emotions, | |
| "confidence": 70 + (ai_detection["confidence"] * 0.3), | |
| "context": f"The text appears to be {'AI-Generated' if ai_detection['classification'] == 'ai' else 'Human-Authored'} based on linguistic patterns and sentiment analysis.", | |
| "avg_polarity": model_sentiment.get("avg_polarity", 0), | |
| "polarity_variance": polarity_variance | |
| }, | |
| "topics": [ | |
| { | |
| "topic": "General Content", | |
| "relevance": 0.8, | |
| "keywords": words[:5] if len(words) >= 5 else words | |
| } | |
| ], | |
| "writingStyle": { | |
| "tone": tone, | |
| "formality": formality, | |
| "complexity": complexity, | |
| "style": [formality, complexity, tone], | |
| "audience": audience, | |
| "sentiment_consistency": "very_low" if polarity_variance <= 0.10 else "low" if polarity_variance <= 0.35 else "moderate" if polarity_variance <= 0.60 else "high" | |
| }, | |
| "insights": insights, | |
| "plagiarismRisk": { | |
| "score": int(ai_prob * 100) if is_ai else (int(ai_prob * 70) if is_mixed else 10), | |
| "level": "high" if is_ai and ai_prob > 0.8 else "medium" if (is_ai or is_mixed) else "low", | |
| "details": f"{'High' if is_ai else 'Moderate' if is_mixed else 'Low'} similarity to AI-generated patterns detected." | |
| }, | |
| "contentQuality": { | |
| "overall": int(85 - (ai_prob * 20)) if is_ai else (int(80 - (ai_prob * 15)) if is_mixed else 90), | |
| "clarity": int(90 - (ai_prob * 10)) if not is_mixed else int(85 - (ai_prob * 8)), | |
| "coherence": int(88 - (ai_prob * 8)) if not is_mixed else int(82 - (ai_prob * 6)), | |
| "engagement": int(75 - (ai_prob * 25)) if not is_mixed else int(70 - (ai_prob * 20)), | |
| "originality": int(60 - (ai_prob * 40)) if is_ai else (int(70 - (ai_prob * 30)) if is_mixed else 85) | |
| }, | |
| "aiOrHuman": ai_detection["classification"], | |
| "aiOrHumanConfidence": ai_detection["confidence"], | |
| "aiOrHumanExplanation": ai_detection["explanation"], | |
| "mixedAnalysis": mixed_analysis, | |
| "probabilityBreakdown": probability_breakdown, | |
| "rawChunks": ai_detection.get("raw_chunks", []), | |
| "sentenceAnalysis": ai_detection.get("sentence_analysis", []), | |
| "mergedSpans": ai_detection.get("merged_spans", []), | |
| "modelProcessingTime": processing_seconds | |
| } | |
| def get_model_info(self) -> Dict[str, Any]: | |
| """Get information about the loaded models""" | |
| return { | |
| "model_loaded": self.model_loaded, | |
| "model_path": self.model_path, | |
| "device": str(self.device), | |
| "max_length": self.max_length, | |
| "architecture": "Two-Branch (DeBERTa + Sentiment Features)", | |
| "primary_model": "DeBERTa-v3-large (desklib/ai-text-detector-v1.01)", | |
| "sentiment_model": "DistilBERT-SST-2", | |
| "classifier": "XGBoost" if self.xgboost_model is not None else "DeBERTa Linear", | |
| "features": [ | |
| "DeBERTa embeddings (1024 dimensions)", | |
| "Average sentiment polarity", | |
| "Sentiment polarity variance" | |
| ], | |
| "description": "Two-branch model for detecting AI-Generated vs Human-Authored text using DeBERTa semantic embeddings combined with sentiment features" | |
| } | |
| # |