USBジョイスティックを接続したRaspberry Pi 5(ubuntu24.04)IP 192.168.10.202から、サーボモーターSG90を接続したRaspberry Pi 4(ubuntu22.04)、IP 192.168.10.201へ、ネットワークを経由して操作信号を送ります。

TCPで定期的に信号を送る仕組みでは、サーボがカクカクして、動作も遅くリモート操作としては悪い状況です。UDPベースにしたらとても上手く動きました。

 

Raspberry Pi 5(ubuntu24.04)、IP 192.168.10.202

send.py

import socket
from inputs import devices, get_gamepad
import sys

# 送信先のIPとポート
server_ip = "192.168.10.201"
server_port = 5000

# ソケットの設定(UDP)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

print("接続待機中...")

# ゲームパッドの取得
try:
    gamepad = devices.gamepads[0]
    print(f"ゲームパッドが接続されました: {gamepad}")
except IndexError:
    print("エラー: ゲームパッドが接続されていません。プログラムを終了します。")
    sys.exit(1)

print(f"{server_ip}:{server_port} にデータを送信します。")
print("接続が確立されました。操作信号を送信します。")

try:
    while True:
        # イベントの取得
        events = get_gamepad()
        for event in events:
            if event.ev_type == "Absolute" and event.code in ["ABS_X", "ABS_RY"]:
                # 値を正規化
                axis_value = event.state / 32767  # -1 から 1 の範囲に正規化
                # データの作成(例: "ABS_X:0.5")
                message = f"{event.code}:{axis_value:.3f}"
                # データの送信
                sock.sendto(message.encode('utf-8'), (server_ip, server_port))
                # 操作信号の表示
                print(f"送信: {message}")
except KeyboardInterrupt:
    print("\nプログラムを終了します。")
except Exception as e:
    print(f"エラーが発生しました: {e}")
finally:
    sock.close()

 

Raspberry Pi 4(ubuntu22.04)、IP 192.168.10.201

receive.py

import socket
from gpiozero import PWMOutputDevice
from gpiozero.pins.pigpio import PiGPIOFactory
import time
import sys

# ソケットの設定
local_ip = "0.0.0.0"
local_port = 5000

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((local_ip, local_port))

print(f"ポート {local_port} でデータを待機しています...")
print("接続待機中...")

# GPIOの設定(PiGPIOFactoryを使って精密制御)
factory = PiGPIOFactory()

# サーボとESCの初期化
try:
    # サーボ(GPIOピン12)
    servo = PWMOutputDevice(12, pin_factory=factory, frequency=50)  # 50HzのPWM信号
    # ESC(GPIOピン13)
    esc = PWMOutputDevice(13, pin_factory=factory, frequency=50)    # 50HzのPWM信号
except Exception as e:
    print(f"GPIOの初期化中にエラーが発生しました: {e}")
    sys.exit(1)

def set_servo_angle(angle):
    """
    サーボの角度を設定します。角度は-90度から90度の範囲です。
    """
    min_pulse_width = 0.5 / 1000
    max_pulse_width = 2.4 / 1000
    frame_width = 1 / 50
    pulse_width = ((angle + 90) / 180) * (max_pulse_width - min_pulse_width) + min_pulse_width
    duty_cycle = pulse_width / frame_width
    servo.value = duty_cycle

def set_esc_speed(pulse_width_ms):
    """
    ESCの速度を設定します。パルス幅はミリ秒単位で指定します。
    """
    pulse_width_s = pulse_width_ms / 1000
    frame_width_s = 1 / 50
    duty_cycle = pulse_width_s / frame_width_s
    esc.value = duty_cycle

def initialize_devices():
    """
    サーボとESCを安全な初期状態に設定します。
    """
    set_servo_angle(0)
    set_esc_speed(1.5)
    time.sleep(1)

def shutdown_devices():
    """
    サーボとESCを安全な停止状態に設定します。
    """
    set_servo_angle(0)
    set_esc_speed(1.5)
    time.sleep(1)
    servo.close()
    esc.close()

def main():
    # デバイスの初期化
    initialize_devices()

    connection_established = False

    try:
        while True:
            # データの受信
            data, addr = sock.recvfrom(1024)
            message = data.decode('utf-8')

            if not connection_established:
                print(f"送信元 {addr[0]} と接続が確立されました。")
                connection_established = True

            if ":" in message:
                code, value = message.split(":")
                axis_value = float(value)
                if code == "ABS_X":
                    angle = axis_value * 90
                    angle = max(min(angle, 90), -90)
                    set_servo_angle(angle)
                    print(f"受信: {message} -> サーボ角度を {angle:.1f} 度に設定")
                elif code == "ABS_RY":
                    if abs(axis_value) < 0.05: axis_value = 0 pulse_width = 1.5 + (axis_value * 0.1) pulse_width = max(min(pulse_width, 1.6), 1.4) set_esc_speed(pulse_width) print(f"受信: {message} -> ESCパルス幅を {pulse_width:.2f} msに設定")

    except KeyboardInterrupt:
        print("\nプログラムを終了します。")
    except Exception as e:
        print(f"エラーが発生しました: {e}")
    finally:
        # デバイスを安全な状態にシャットダウン
        shutdown_devices()
        sock.close()
        print("デバイスを安全な状態に設定しました。")

if __name__ == "__main__":
    main()

 

サーボモーターの制御が素早く滑らかにするためinputsライブラリにより、ジョイスティックが変化した時だけイベントを送信することで、処理を軽くしつつシステムのリアルタイム性が向上しました。

データ転送にUDPソケットを使用しました。UDPはコネクションレスプロトコルであり、コネクションの確立や状態管理が不要なため、TCPよりもオーバーヘッドが少なくデータ送受信が速く行えます。

受信側では非ブロッキングソケットを設定し、データがない場合でも処理が停止することなく、サーボモーターの位置を連続して更新できるようにしました。これにより、サーボの反応性が向上し、システム全体の反応性も良くなりました。