# streaming.py import asyncio import logging logger = logging.getLogger(__name__) async def stream_in_chunks(publish, exchange: str, llm_obj_builder, text: str, batch_size: int = 3, max_chars: int = 100, base_delay_ms: int = 30, per_char_ms: int = 2) -> None: seps = set(" ,!?{}.:;\n") buf, parts, count = [], [], 0 for ch in text: parts.append(ch) if ch in seps: buf.append("".join(parts)); parts.clear(); count += 1 if count >= batch_size or sum(len(x) for x in buf) >= max_chars: o = llm_obj_builder("".join(buf)) await publish(exchange, o) await asyncio.sleep((base_delay_ms + per_char_ms * sum(len(x) for x in buf))/1000) buf.clear(); count = 0 if parts: buf.append("".join(parts)) if buf: await publish(exchange, llm_obj_builder("".join(buf)))