import socket import picar_4wd as fc import time import os import json import threading import numpy as np HOST = "192.168.31.81" PORT = 65432 SPEED = 20 # default speed cur_speed = SPEED Forward_count = 0 Backward_count = 0 car_status = { "direction": "stop", "temperature": 0.0, "speed": 0.0, "distance_traveled": 0.0, "timestamp": 0 } running = True def get_battery_level(): voltage = fc.power_read() min_voltage = 6.0 max_voltage = 8.4 percentage = ((voltage - min_voltage) / (max_voltage - min_voltage)) * 100 return max(0, min(100, round(percentage, 1))) def get_pi_temperature(): temp = os.popen("vcgencmd measure_temp").readline() temp = float(temp.replace("temp=", "").replace("'C", "")) return temp def update_car_status(): global car_status last_update_time = time.time() last_distance = 0.0 while running: current_time = time.time() car_status["temperature"] = get_pi_temperature() car_status["speed"] = cur_speed distance_increment = (Forward_count + Backward_count) * 0.03 car_status["distance_traveled"] = distance_increment car_status["timestamp"] = int(current_time) last_update_time = current_time time.sleep(0.1) status_thread = threading.Thread(target=update_car_status) status_thread.daemon = True status_thread.start() def car_movement(direction): # Map key codes to car movements # 87 - W (forward) # 83 - S (backward) # 65 - A (left) # 68 - D (right) global SPEED, cur_speed global Forward_count global Backward_count if direction == 87: # W - Forward print("Moving forward") car_status["direction"] = "forward" Forward_count += 1 cur_speed = SPEED fc.forward(cur_speed) # Speed value can be adjusted elif direction == 83: # S - Backward print("Moving backward") car_status["direction"] = "backward" Backward_count += 1 cur_speed = SPEED fc.backward(cur_speed) elif direction == 65: # A - Left print("Turning left") car_status["direction"] = "left" cur_speed = SPEED / 2 fc.turn_left(cur_speed) elif direction == 68: # D - Right print("Turning right") car_status["direction"] = "right" cur_speed = SPEED / 2 fc.turn_right(cur_speed) else: # If anything else comes in, stop the car cur_speed = 0 car_status["direction"] = "stop" print("Stopping car") fc.stop() return direction with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind((HOST, PORT)) s.listen() print(f"Server started, listening on {HOST}:{PORT}") try: while True: client, clientInfo = s.accept() print("server recv from: ", clientInfo) data = client.recv(1024) # car_movement(0) if data != b"": print(data) command = data.decode().strip() if command == "GET_STATUS": # Send full car status response = json.dumps(car_status) client.sendall(response.encode()) else: # Assume it's a direction command direction = int(command) ret_val = car_movement(direction) # Send back status along with confirmation response = json.dumps({ "command_received": direction, "status": car_status }) client.sendall(response.encode()) else: fc.stop() client.close() except Exception as e: print(f"Error: {e}") finally: fc.stop() print("Closing server") running = False # Stop the background thread s.close()