Spaces:
Configuration error
Configuration error
feat: Introduce new backend architecture with notebooks, sources, chat, and CLaRa models, alongside database schema and updated deployment scripts, while removing old frontend, deployment files, and previous backend components.
88f8604
| """ | |
| Antigravity Notebook - Streamlit UI | |
| NotebookLM-style interface for the Antigravity Notebook system. | |
| """ | |
| import streamlit as st | |
| import requests | |
| from typing import List, Dict, Optional | |
| import plotly.graph_objects as go | |
| from datetime import datetime | |
| # API Configuration | |
| API_BASE_URL = "http://localhost:8000" | |
| def init_session_state(): | |
| """Initialize session state variables""" | |
| if "selected_notebook_id" not in st.session_state: | |
| st.session_state.selected_notebook_id = None | |
| if "chat_messages" not in st.session_state: | |
| st.session_state.chat_messages = [] | |
| if "notebooks" not in st.session_state: | |
| st.session_state.notebooks = [] | |
| def api_call(method: str, endpoint: str, **kwargs): | |
| """Make API call with error handling""" | |
| url = f"{API_BASE_URL}{endpoint}" | |
| try: | |
| response = requests.request(method, url, **kwargs) | |
| response.raise_for_status() | |
| return response.json() if response.text else None | |
| except requests.exceptions.RequestException as e: | |
| st.error(f"API Error: {e}") | |
| return None | |
| def load_notebooks() -> List[Dict]: | |
| """Load all notebooks""" | |
| data = api_call("GET", "/notebooks/") | |
| return data if data else [] | |
| def create_notebook(name: str, description: str = ""): | |
| """Create a new notebook""" | |
| return api_call("POST", "/notebooks/", json={ | |
| "name": name, | |
| "description": description | |
| }) | |
| def get_notebook_details(notebook_id: str) -> Optional[Dict]: | |
| """Get detailed notebook information""" | |
| return api_call("GET", f"/notebooks/{notebook_id}") | |
| def get_notebook_stats(notebook_id: str) -> Optional[Dict]: | |
| """Get notebook statistics""" | |
| return api_call("GET", f"/notebooks/{notebook_id}/stats") | |
| def upload_pdf(notebook_id: str, file): | |
| """Upload a PDF file""" | |
| files = {"file": (file.name, file, "application/pdf")} | |
| return api_call("POST", f"/sources/notebooks/{notebook_id}/sources/upload", files=files) | |
| def add_url_source(notebook_id: str, url: str, title: str = None): | |
| """Add a URL source""" | |
| return api_call("POST", f"/sources/notebooks/{notebook_id}/sources/url", json={ | |
| "url": url, | |
| "title": title | |
| }) | |
| def add_text_source(notebook_id: str, content: str, title: str): | |
| """Add text source""" | |
| return api_call("POST", f"/sources/notebooks/{notebook_id}/sources/text", json={ | |
| "content": content, | |
| "title": title | |
| }) | |
| def delete_source(source_id: str): | |
| """Delete a source""" | |
| return api_call("DELETE", f"/sources/sources/{source_id}") | |
| def query_notebook(notebook_id: str, query: str) -> Optional[Dict]: | |
| """Query the notebook""" | |
| return api_call("POST", f"/chat/notebooks/{notebook_id}/chat", json={ | |
| "query": query, | |
| "include_history": True | |
| }) | |
| def get_chat_history(notebook_id: str) -> List[Dict]: | |
| """Get chat history""" | |
| data = api_call("GET", f"/chat/notebooks/{notebook_id}/messages") | |
| return data if data else [] | |
| def render_memory_gauge(stats: Dict): | |
| """Render memory usage gauge""" | |
| usage_percent = stats.get("context_usage_percent", 0) | |
| fig = go.Figure(go.Indicator( | |
| mode="gauge+number+delta", | |
| value=usage_percent, | |
| title={'text': "Context Usage"}, | |
| delta={'reference': 100}, | |
| gauge={ | |
| 'axis': {'range': [None, 100]}, | |
| 'bar': {'color': "darkblue"}, | |
| 'steps': [ | |
| {'range': [0, 50], 'color': "lightgreen"}, | |
| {'range': [50, 80], 'color': "yellow"}, | |
| {'range': [80, 100], 'color': "orange"} | |
| ], | |
| 'threshold': { | |
| 'line': {'color': "red", 'width': 4}, | |
| 'thickness': 0.75, | |
| 'value': 100 | |
| } | |
| } | |
| )) | |
| fig.update_layout(height=200, margin=dict(l=20, r=20, t=40, b=20)) | |
| st.plotly_chart(fig, use_container_width=True) | |
| # Stats details | |
| st.caption(f"π {stats.get('total_tokens', 0):,} / {stats.get('max_context_tokens', 0):,} tokens") | |
| if stats.get('can_fit_full_context', True): | |
| st.success("β Full notebook fits in context!") | |
| else: | |
| st.warning("β οΈ Using selective retrieval") | |
| def render_sidebar(): | |
| """Render the left sidebar with notebook and source management""" | |
| with st.sidebar: | |
| st.title("π Antigravity Notebook") | |
| # Notebook selector | |
| st.subheader("Notebooks") | |
| notebooks = load_notebooks() | |
| st.session_state.notebooks = notebooks | |
| if notebooks: | |
| notebook_options = {nb["name"]: nb["id"] for nb in notebooks} | |
| selected_name = st.selectbox( | |
| "Select Notebook", | |
| options=list(notebook_options.keys()), | |
| key="notebook_selector" | |
| ) | |
| st.session_state.selected_notebook_id = notebook_options[selected_name] | |
| else: | |
| st.info("No notebooks yet. Create one below!") | |
| st.session_state.selected_notebook_id = None | |
| # Create new notebook | |
| with st.expander("β Create New Notebook"): | |
| new_name = st.text_input("Name", key="new_notebook_name") | |
| new_desc = st.text_area("Description (optional)", key="new_notebook_desc") | |
| if st.button("Create Notebook"): | |
| if new_name: | |
| result = create_notebook(new_name, new_desc) | |
| if result: | |
| st.success(f"Created: {new_name}") | |
| st.rerun() | |
| else: | |
| st.error("Name is required") | |
| st.divider() | |
| # Sources management (only if notebook selected) | |
| if st.session_state.selected_notebook_id: | |
| notebook_id = st.session_state.selected_notebook_id | |
| # Get notebook details | |
| details = get_notebook_details(notebook_id) | |
| stats = get_notebook_stats(notebook_id) | |
| if details and stats: | |
| st.subheader("π Sources") | |
| # Display sources | |
| sources = details.get("sources", []) | |
| if sources: | |
| for source in sources: | |
| with st.container(): | |
| col1, col2 = st.columns([3, 1]) | |
| with col1: | |
| icon = {"pdf": "π", "url": "π", "text": "π"}.get( | |
| source["source_type"], "π" | |
| ) | |
| st.write(f"{icon} {source['filename']}") | |
| st.caption(f"{source['total_tokens']} tokens") | |
| with col2: | |
| if st.button("ποΈ", key=f"delete_{source['id']}"): | |
| delete_source(source['id']) | |
| st.rerun() | |
| st.divider() | |
| else: | |
| st.info("No sources yet. Add some below!") | |
| # Add sources | |
| st.subheader("β Add Source") | |
| tab1, tab2, tab3 = st.tabs(["π PDF", "π URL", "π Text"]) | |
| with tab1: | |
| uploaded_file = st.file_uploader( | |
| "Upload PDF", | |
| type=["pdf"], | |
| key="pdf_uploader" | |
| ) | |
| if uploaded_file and st.button("Upload PDF"): | |
| with st.spinner("Processing PDF..."): | |
| result = upload_pdf(notebook_id, uploaded_file) | |
| if result: | |
| st.success("PDF uploaded!") | |
| st.rerun() | |
| with tab2: | |
| url_input = st.text_input("URL", key="url_input") | |
| url_title = st.text_input("Title (optional)", key="url_title") | |
| if st.button("Add URL"): | |
| if url_input: | |
| with st.spinner("Scraping URL..."): | |
| result = add_url_source( | |
| notebook_id, | |
| url_input, | |
| url_title if url_title else None | |
| ) | |
| if result: | |
| st.success("URL added!") | |
| st.rerun() | |
| else: | |
| st.error("URL is required") | |
| with tab3: | |
| text_title = st.text_input("Title", key="text_title") | |
| text_content = st.text_area("Content", key="text_content", height=150) | |
| if st.button("Add Text"): | |
| if text_title and text_content: | |
| with st.spinner("Processing text..."): | |
| result = add_text_source(notebook_id, text_content, text_title) | |
| if result: | |
| st.success("Text added!") | |
| st.rerun() | |
| else: | |
| st.error("Title and content are required") | |
| st.divider() | |
| # Memory gauge | |
| st.subheader("π§ Memory Usage") | |
| render_memory_gauge(stats) | |
| def render_main_chat(): | |
| """Render the main chat interface""" | |
| st.title("π¬ Chat with Your Notebook") | |
| if not st.session_state.selected_notebook_id: | |
| st.info("π Select or create a notebook to start chatting") | |
| return | |
| notebook_id = st.session_state.selected_notebook_id | |
| # Get notebook details | |
| details = get_notebook_details(notebook_id) | |
| if not details: | |
| st.error("Failed to load notebook") | |
| return | |
| st.subheader(f"π {details['name']}") | |
| if details.get('description'): | |
| st.caption(details['description']) | |
| # Check if sources exist | |
| if not details.get('sources'): | |
| st.warning("β οΈ No sources in this notebook. Add some sources to start chatting!") | |
| return | |
| st.divider() | |
| # Load chat history | |
| chat_history = get_chat_history(notebook_id) | |
| # Display chat history | |
| for message in chat_history: | |
| role = message["role"] | |
| content = message["content"] | |
| if role == "user": | |
| with st.chat_message("user"): | |
| st.write(content) | |
| else: | |
| with st.chat_message("assistant"): | |
| st.write(content) | |
| # Show sources if available | |
| sources = message.get("sources_used", []) | |
| if sources: | |
| with st.expander("π Sources"): | |
| unique_sources = {} | |
| for src in sources: | |
| key = src["source_id"] | |
| if key not in unique_sources: | |
| unique_sources[key] = src | |
| for src in unique_sources.values(): | |
| st.caption(f"β’ {src['filename']} ({src['source_type']})") | |
| # Chat input | |
| user_query = st.chat_input("Ask a question about your sources...") | |
| if user_query: | |
| # Display user message | |
| with st.chat_message("user"): | |
| st.write(user_query) | |
| # Query the notebook | |
| with st.chat_message("assistant"): | |
| with st.spinner("Thinking..."): | |
| response = query_notebook(notebook_id, user_query) | |
| if response: | |
| st.write(response["response"]) | |
| # Show sources | |
| sources = response.get("sources_used", []) | |
| if sources: | |
| with st.expander("π Sources"): | |
| unique_sources = {} | |
| for src in sources: | |
| key = src["source_id"] | |
| if key not in unique_sources: | |
| unique_sources[key] = src | |
| for src in unique_sources.values(): | |
| st.caption(f"β’ {src['filename']} ({src['source_type']})") | |
| # Show token usage | |
| token_usage = response.get("token_usage", {}) | |
| if token_usage: | |
| st.caption( | |
| f"π Used {token_usage.get('selected_tokens', 0):,} / " | |
| f"{token_usage.get('max_tokens', 0):,} tokens " | |
| f"({token_usage.get('usage_percent', 0):.1f}%)" | |
| ) | |
| else: | |
| st.error("Failed to get response") | |
| # Rerun to show the new message in history | |
| st.rerun() | |
| def main(): | |
| """Main application""" | |
| st.set_page_config( | |
| page_title="Antigravity Notebook", | |
| page_icon="π", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| init_session_state() | |
| # Render sidebar | |
| render_sidebar() | |
| # Render main chat area | |
| render_main_chat() | |
| if __name__ == "__main__": | |
| main() | |