| | import torch |
| | from torch import nn |
| | from .vision_encoder import VisionEncoder |
| | from .configuration_moondream import MoondreamConfig |
| | from transformers import PreTrainedModel |
| | import re |
| |
|
| | from .modeling_phi import PhiForCausalLM |
| | from .configuration_moondream import PhiConfig |
| |
|
| | class Moondream(PreTrainedModel): |
| | config_class = MoondreamConfig |
| |
|
| | def __init__(self, config): |
| | super().__init__(config) |
| | self.vision_encoder = VisionEncoder() |
| |
|
| | if type(config.phi_config) == dict: |
| | phi_config = PhiConfig(**config.phi_config) |
| | else: |
| | phi_config = config.phi_config |
| | self.text_model = PhiForCausalLM(phi_config) |
| |
|
| | @property |
| | def device(self): |
| | return self.text_model.device |
| |
|
| | def encode_image(self, image): |
| | return self.vision_encoder(image) |
| |
|
| | def input_embeds(self, prompt, image_embeds, tokenizer): |
| | def _tokenize(txt): |
| | return tokenizer( |
| | txt, return_tensors="pt", add_special_tokens=False |
| | ).input_ids.to(self.device) |
| |
|
| | text_emb = self.text_model.get_input_embeddings() |
| |
|
| | |
| | embeds = [] |
| | embeds.append( |
| | text_emb((torch.tensor([[tokenizer.bos_token_id]], device=self.device))) |
| | ) |
| |
|
| | if "<image>" not in prompt: |
| | embeds.append(text_emb(_tokenize(prompt))) |
| | else: |
| | assert prompt.count("<image>") == 1 |
| | before, after = prompt.split("<image>") |
| | embeds.append(text_emb(_tokenize(f"{before}<image>"))) |
| | embeds.append(image_embeds.to(self.device)) |
| | embeds.append(text_emb(_tokenize(f"</image>{after}"))) |
| |
|
| | return torch.cat(embeds, dim=1) |
| |
|
| | def generate( |
| | self, |
| | image_embeds, |
| | prompt, |
| | tokenizer, |
| | eos_text="<END>", |
| | max_new_tokens=128, |
| | **kwargs, |
| | ): |
| | eos_tokens = tokenizer(eos_text, add_special_tokens=False)[0].ids |
| |
|
| | generate_config = { |
| | "eos_token_id": eos_tokens, |
| | "bos_token_id": tokenizer.bos_token_id, |
| | "pad_token_id": tokenizer.eos_token_id, |
| | "max_new_tokens": max_new_tokens, |
| | **kwargs, |
| | } |
| |
|
| | with torch.no_grad(): |
| | inputs_embeds = self.input_embeds(prompt, image_embeds, tokenizer) |
| | output_ids = self.text_model.generate( |
| | inputs_embeds=inputs_embeds, **generate_config |
| | ) |
| |
|
| | return tokenizer.batch_decode(output_ids, skip_special_tokens=True) |
| |
|
| | def answer_question( |
| | self, |
| | image_embeds, |
| | question, |
| | tokenizer, |
| | chat_history="", |
| | result_queue=None, |
| | **kwargs, |
| | ): |
| | prompt = f"<image>\n\n{chat_history}Question: {question}\n\nAnswer: " |
| | answer = self.generate( |
| | image_embeds, |
| | prompt, |
| | eos_text="<END>", |
| | tokenizer=tokenizer, |
| | max_new_tokens=256, |
| | **kwargs, |
| | )[0] |
| | cleaned_answer = re.sub("<$", "", re.sub("END$", "", answer)).strip() |
| |
|
| | |
| | if result_queue: |
| | result_queue.put(cleaned_answer) |
| | else: |
| | return cleaned_answer |
| |
|