|
|
import asyncio |
|
|
import json |
|
|
import websockets |
|
|
import requests |
|
|
import base64 |
|
|
import time |
|
|
import mss |
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
from io import BytesIO |
|
|
from datetime import datetime |
|
|
import pyautogui |
|
|
|
|
|
|
|
|
class Dino: |
|
|
def __init__(self, class_name): |
|
|
self.class_name = class_name |
|
|
self.ws_url = self.get_ws_url() |
|
|
self.websocket = None |
|
|
self.command_id = 1 |
|
|
|
|
|
@staticmethod |
|
|
def get_ws_url(): |
|
|
response = requests.get('http://localhost:1234/json') |
|
|
data = response.json() |
|
|
return data[0]['webSocketDebuggerUrl'] |
|
|
|
|
|
async def connect(self): |
|
|
self.websocket = await websockets.connect(self.ws_url) |
|
|
|
|
|
await self.send_command("DOM.enable", {}) |
|
|
await self.send_command("CSS.enable", {}) |
|
|
await self.send_command("Page.enable", {}) |
|
|
await self.send_command("Runtime.enable", {}) |
|
|
|
|
|
async def send_command(self, method, params): |
|
|
command = { |
|
|
"id": self.command_id, |
|
|
"method": method, |
|
|
"params": params |
|
|
} |
|
|
await self.websocket.send(json.dumps(command)) |
|
|
self.command_id += 1 |
|
|
|
|
|
while True: |
|
|
response = await self.websocket.recv() |
|
|
response_data = json.loads(response) |
|
|
if response_data.get("id") == command["id"]: |
|
|
return response_data |
|
|
|
|
|
async def capture_screenshot(self): |
|
|
try: |
|
|
|
|
|
root = await self.send_command("DOM.getDocument", {"depth": -1}) |
|
|
root_node_id = root["result"]["root"]["nodeId"] |
|
|
|
|
|
|
|
|
search = await self.send_command("DOM.querySelector", {"nodeId": root_node_id, "selector": f".{self.class_name}"}) |
|
|
node_id = search["result"]["nodeId"] |
|
|
|
|
|
|
|
|
box_model = await self.send_command("DOM.getBoxModel", {"nodeId": node_id}) |
|
|
content_box = box_model["result"]["model"]["content"] |
|
|
|
|
|
|
|
|
screenshot = await self.send_command("Page.captureScreenshot", { |
|
|
"clip": { |
|
|
"x": content_box[0], |
|
|
"y": content_box[1], |
|
|
"width": content_box[2] - content_box[0], |
|
|
"height": content_box[5] - content_box[1], |
|
|
"scale": 1 |
|
|
} |
|
|
}) |
|
|
|
|
|
|
|
|
screenshot_data = base64.b64decode(screenshot["result"]["data"]) |
|
|
image = Image.open(BytesIO(screenshot_data)) |
|
|
|
|
|
resized_image = image.resize((image.width//5, image.height//5)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cropped_image = resized_image.crop((52, 0, 82, resized_image.height)) |
|
|
final_image = cropped_image.resize((30, 92)) |
|
|
return final_image |
|
|
|
|
|
except Exception as e: |
|
|
print(f"An error occurred: {e}") |
|
|
|
|
|
async def get_window_name(self): |
|
|
try: |
|
|
|
|
|
response = await self.send_command("Runtime.evaluate", { |
|
|
"expression": "window.name" |
|
|
}) |
|
|
|
|
|
window_name = response["result"]["result"]["value"] |
|
|
|
|
|
print(f"Window name: {window_name}") |
|
|
return window_name |
|
|
except Exception as e: |
|
|
print(f"An error occurred while getting window name: {e}") |
|
|
return None |
|
|
|
|
|
async def enable_all_obstacles(self): |
|
|
try: |
|
|
|
|
|
response = await self.send_command("Runtime.evaluate", { |
|
|
"expression": "spriteDefinitionByType.original.OBSTACLES[2].minSpeed = 0" |
|
|
}) |
|
|
|
|
|
print(f"Enabled all obstacles") |
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"An error occurred while enabling obstacles: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
async def open_dino(self): |
|
|
try: |
|
|
response = await self.send_command("Page.navigate", { |
|
|
"url": "chrome://dino/" |
|
|
}) |
|
|
|
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"An error occurred while opening game: {e}") |
|
|
return None |
|
|
|
|
|
async def send_key_event(self, key, code, key_code): |
|
|
|
|
|
try: |
|
|
|
|
|
response1 = await self.send_command("Input.dispatchKeyEvent", { |
|
|
"type": "rawKeyDown", |
|
|
"key": key, |
|
|
"code": code, |
|
|
"keyCode": key_code, |
|
|
"windowsVirtualKeyCode": key_code, |
|
|
"nativeVirtualKeyCode": key_code, |
|
|
"modifiers": 0 |
|
|
}) |
|
|
|
|
|
if key_code == 40: time.sleep(0.4) |
|
|
|
|
|
response = await self.send_command("Input.dispatchKeyEvent", { |
|
|
"type": "keyUp", |
|
|
"key": key, |
|
|
"code": code, |
|
|
"keyCode": key_code, |
|
|
"windowsVirtualKeyCode": key_code, |
|
|
"nativeVirtualKeyCode": key_code, |
|
|
"modifiers": 0 |
|
|
}) |
|
|
|
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"An error occurred while sending key event: {e}") |
|
|
return None |
|
|
|
|
|
async def send_key_event2(self, key): |
|
|
|
|
|
try: |
|
|
|
|
|
pyautogui.press(key) |
|
|
|
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"An error occurred while sending key event: {e}") |
|
|
return None |
|
|
|
|
|
async def check_status(self): |
|
|
try: |
|
|
crashed = await self.send_command("Runtime.evaluate", { |
|
|
"expression": "Runner.instance_.crashed" |
|
|
}) |
|
|
score = 0.0 |
|
|
try: |
|
|
score = await self.send_command("Runtime.evaluate", { |
|
|
"expression": "Runner.instance_.distanceRan" |
|
|
}) |
|
|
|
|
|
score = float(score['result']['result']['value']) // 10 |
|
|
except: |
|
|
pass |
|
|
|
|
|
return { |
|
|
"crashed": crashed['result']['result']['value'], |
|
|
"score": score |
|
|
} |
|
|
except Exception as e: |
|
|
print(f"An error occurred while checking status: {e}") |
|
|
return None |
|
|
|
|
|
async def complete_action(self): |
|
|
try: |
|
|
crashed = await self.send_command("Runtime.evaluate", { |
|
|
"expression": "Runner.instance_.crashed" |
|
|
}) |
|
|
crashed = crashed['result']['result']['value'] |
|
|
while not crashed: |
|
|
jumping = await self.send_command("Runtime.evaluate", { |
|
|
"expression": "Runner.instance_.tRex.jumping" |
|
|
}) |
|
|
jumping = jumping['result']['result']['value'] |
|
|
|
|
|
ducking = await self.send_command("Runtime.evaluate", { |
|
|
"expression": "Runner.instance_.tRex.ducking" |
|
|
}) |
|
|
ducking = ducking['result']['result']['value'] |
|
|
|
|
|
crashed = await self.send_command("Runtime.evaluate", { |
|
|
"expression": "Runner.instance_.crashed" |
|
|
}) |
|
|
crashed = crashed['result']['result']['value'] |
|
|
|
|
|
if (not jumping) and (not ducking): break |
|
|
|
|
|
except Exception as e: |
|
|
print(f"An error occurred while selecting action: {e}") |
|
|
return None |
|
|
|
|
|
async def capture_screenshot2(self): |
|
|
try: |
|
|
with mss.mss() as sct: |
|
|
|
|
|
monitor = { |
|
|
"top": 245, |
|
|
"left": 730, |
|
|
"width": 200, |
|
|
"height": 45, |
|
|
} |
|
|
|
|
|
|
|
|
screenshot = sct.grab(monitor) |
|
|
|
|
|
|
|
|
img = np.array(screenshot) |
|
|
|
|
|
|
|
|
img = img[:, :, :3] |
|
|
img = img[..., ::-1] |
|
|
|
|
|
|
|
|
image = Image.fromarray(img) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return image |
|
|
|
|
|
|
|
|
except Exception as e: |
|
|
print(f"An error occurred while opening game: {e}") |
|
|
return None |
|
|
|
|
|
async def start(self): |
|
|
await self.connect() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|