Skip to content
Snippets Groups Projects
wifi_server.py 6.33 KiB
Newer Older
  • Learn to ignore specific revisions
  • dl35's avatar
    dl35 committed
    import socket
    import picar_4wd as fc
    import time
    import os
    import json
    import threading
    import numpy as np
    
    
    HOST = "192.168.31.81" # IP address of your Raspberry PI
    PORT = 65432          # Port to listen on (non-privileged ports are > 1023)
    SPEED = 20            # default speed
    cur_speed = SPEED
    Forward_count = 0
    Backward_count = 0
    car_status = {
        # "battery": 0.0,
        "direction": "stop",
        "temperature": 0.0,
        "speed": 0.0,
        "distance_traveled": 0.0,
        "timestamp": 0
    }
    
    # Flag to control the background thread
    running = True
    
    def get_battery_level():
        voltage = fc.power_read()  # Read voltage from picar_4wd
        # Map voltage to percentage (adjust min/max values for your battery)
        min_voltage = 6.0
        max_voltage = 8.4
        percentage = ((voltage - min_voltage) / (max_voltage - min_voltage)) * 100
        return max(0, min(100, round(percentage, 1)))
    
    
    def get_pi_temperature():
        # Read CPU temperature from system file
        temp = os.popen("vcgencmd measure_temp").readline()
        temp = float(temp.replace("temp=", "").replace("'C", ""))
        return temp
    
    # def estimate_speed():
    #     # This is just an approximation - ideally you'd use wheel encoders
    #     # You might need to adjust this based on your specific motor settings
    #     # This assumes fc.speed gives a value between -100 and 100
    #     motor_speed = fc.speed  # You may need to adjust this based on your library
    #     # Convert motor power level to approximate cm/s
    #     # This conversion factor (0.5) needs calibration for your specific car
    #     speed_cms = np.absolute(motor_speed) * 0.5  
    #     return round(speed_cms, 2)
    
    def update_car_status():
        global car_status
        last_update_time = time.time()
        last_distance = 0.0
        
        while running:
            current_time = time.time()
            # elapsed_time = current_time - last_update_time
            
            # Update battery level
            # car_status["battery"] = get_battery_level()
            
            # Update temperature
            car_status["temperature"] = get_pi_temperature()
            
            # Update speedw
            # current_speed = estimate_speed()
            car_status["speed"] = cur_speed
            
            # Calculate distance traveled (speed * time)
            distance_increment = (Forward_count + Backward_count) * 0.03
            car_status["distance_traveled"] = distance_increment
            car_status["timestamp"] = int(current_time)
            
            last_update_time = current_time
            time.sleep(0.1)  # Update 10 times per second
    
    # Start the background thread to update car status
    status_thread = threading.Thread(target=update_car_status)
    status_thread.daemon = True
    status_thread.start()
    
    def car_movement(direction):
        # Map key codes to car movements
        # 87 - W (forward)
        # 83 - S (backward)
        # 65 - A (left)
        # 68 - D (right)
        global SPEED, cur_speed
        global Forward_count
        global Backward_count
        if direction == 87:  # W - Forward
            print("Moving forward")
            car_status["direction"] = "forward"
            Forward_count += 1
            cur_speed = SPEED
            fc.forward(cur_speed)  # Speed value can be adjusted
        elif direction == 83:  # S - Backward
            print("Moving backward")
            car_status["direction"] = "backward"
            Backward_count += 1
            cur_speed = SPEED
            fc.backward(cur_speed)
        elif direction == 65:  # A - Left
            print("Turning left")
            car_status["direction"] = "left"
            cur_speed = SPEED / 2
            fc.turn_left(cur_speed)
        elif direction == 68:  # D - Right
            print("Turning right")
            car_status["direction"] = "right"
            cur_speed = SPEED / 2
            fc.turn_right(cur_speed)
        else:
            # If anything else comes in, stop the car
            cur_speed = 0
            car_status["direction"] = "stop"
            print("Stopping car")
            fc.stop()
        
        # You might want to return some status information
        return direction
    
    
    # with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    #     s.bind((HOST, PORT))
    #     s.listen()
    #     # fc.cpu_temperature()
        
    #     try:
    #         while 1:
    #             client, clientInfo = s.accept()
    #             print("server recv from: ", clientInfo)
    #             data = client.recv(1024)      # receive 1024 Bytes of message in binary format
    #             if data != b"":
    #                 print(f"Received: {data}")
    #                 # Convert bytes to integer, removing trailing \r\n
    #                 direction = int(data.decode().strip())
    #                 status = car_movement(direction)
    #                 # Send back status information
    #                 client.sendall(f"Received command: {direction}, {status}".encode())
    #     except: 
    #         print("Closing socket")
    #         client.close()
    #         s.close()    
    
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind((HOST, PORT))
        s.listen()
        print(f"Server started, listening on {HOST}:{PORT}")
    
        try:
            while True:
                client, clientInfo = s.accept()
                print("server recv from: ", clientInfo)
                data = client.recv(1024)
                # car_movement(0)
                if data != b"":
                    print(data)
                    command = data.decode().strip()
                    
                    if command == "GET_STATUS":
                        # Send full car status
                        response = json.dumps(car_status)
                        client.sendall(response.encode())
                    else:
                        
                        # Assume it's a direction command
                        direction = int(command)
                        ret_val = car_movement(direction)
                        
                        # Send back status along with confirmation
                        response = json.dumps({
                            "command_received": direction,
                            "status": car_status
                        })
                        client.sendall(response.encode())
                        # except ValueError:
                        #     print(f"Invalid command: {command}")
                        #     client.sendall(b"Invalid command format")
                        #     break
                        # finally:
                        #     client.close()
                else:
                    fc.stop()
    
                client.close()
        except Exception as e:
            print(f"Error: {e}")
        finally:
            fc.stop()
            print("Closing server")
            running = False  # Stop the background thread
            s.close()