USBジョイスティックを接続したRaspberry Pi 5(ubuntu24.04)IP 192.168.10.202から、サーボモーターSG90を接続したRaspberry Pi 4(ubuntu22.04)、IP 192.168.10.201へ、ネットワークを経由して操作信号を送ります。
TCPで定期的に信号を送る仕組みでは、サーボがカクカクして、動作も遅くリモート操作としては悪い状況です。UDPベースにしたらとても上手く動きました。
Raspberry Pi 5(ubuntu24.04)、IP 192.168.10.202
send.py
import socket
from inputs import devices, get_gamepad
import sys
# 送信先のIPとポート
server_ip = "192.168.10.201"
server_port = 5000
# ソケットの設定(UDP)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print("接続待機中...")
# ゲームパッドの取得
try:
gamepad = devices.gamepads[0]
print(f"ゲームパッドが接続されました: {gamepad}")
except IndexError:
print("エラー: ゲームパッドが接続されていません。プログラムを終了します。")
sys.exit(1)
print(f"{server_ip}:{server_port} にデータを送信します。")
print("接続が確立されました。操作信号を送信します。")
try:
while True:
# イベントの取得
events = get_gamepad()
for event in events:
if event.ev_type == "Absolute" and event.code in ["ABS_X", "ABS_RY"]:
# 値を正規化
axis_value = event.state / 32767 # -1 から 1 の範囲に正規化
# データの作成(例: "ABS_X:0.5")
message = f"{event.code}:{axis_value:.3f}"
# データの送信
sock.sendto(message.encode('utf-8'), (server_ip, server_port))
# 操作信号の表示
print(f"送信: {message}")
except KeyboardInterrupt:
print("\nプログラムを終了します。")
except Exception as e:
print(f"エラーが発生しました: {e}")
finally:
sock.close()
Raspberry Pi 4(ubuntu22.04)、IP 192.168.10.201
receive.py
import socket
from gpiozero import PWMOutputDevice
from gpiozero.pins.pigpio import PiGPIOFactory
import time
import sys
# ソケットの設定
local_ip = "0.0.0.0"
local_port = 5000
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((local_ip, local_port))
print(f"ポート {local_port} でデータを待機しています...")
print("接続待機中...")
# GPIOの設定(PiGPIOFactoryを使って精密制御)
factory = PiGPIOFactory()
# サーボとESCの初期化
try:
# サーボ(GPIOピン12)
servo = PWMOutputDevice(12, pin_factory=factory, frequency=50) # 50HzのPWM信号
# ESC(GPIOピン13)
esc = PWMOutputDevice(13, pin_factory=factory, frequency=50) # 50HzのPWM信号
except Exception as e:
print(f"GPIOの初期化中にエラーが発生しました: {e}")
sys.exit(1)
def set_servo_angle(angle):
"""
サーボの角度を設定します。角度は-90度から90度の範囲です。
"""
min_pulse_width = 0.5 / 1000
max_pulse_width = 2.4 / 1000
frame_width = 1 / 50
pulse_width = ((angle + 90) / 180) * (max_pulse_width - min_pulse_width) + min_pulse_width
duty_cycle = pulse_width / frame_width
servo.value = duty_cycle
def set_esc_speed(pulse_width_ms):
"""
ESCの速度を設定します。パルス幅はミリ秒単位で指定します。
"""
pulse_width_s = pulse_width_ms / 1000
frame_width_s = 1 / 50
duty_cycle = pulse_width_s / frame_width_s
esc.value = duty_cycle
def initialize_devices():
"""
サーボとESCを安全な初期状態に設定します。
"""
set_servo_angle(0)
set_esc_speed(1.5)
time.sleep(1)
def shutdown_devices():
"""
サーボとESCを安全な停止状態に設定します。
"""
set_servo_angle(0)
set_esc_speed(1.5)
time.sleep(1)
servo.close()
esc.close()
def main():
# デバイスの初期化
initialize_devices()
connection_established = False
try:
while True:
# データの受信
data, addr = sock.recvfrom(1024)
message = data.decode('utf-8')
if not connection_established:
print(f"送信元 {addr[0]} と接続が確立されました。")
connection_established = True
if ":" in message:
code, value = message.split(":")
axis_value = float(value)
if code == "ABS_X":
angle = axis_value * 90
angle = max(min(angle, 90), -90)
set_servo_angle(angle)
print(f"受信: {message} -> サーボ角度を {angle:.1f} 度に設定")
elif code == "ABS_RY":
if abs(axis_value) < 0.05: axis_value = 0 pulse_width = 1.5 + (axis_value * 0.1) pulse_width = max(min(pulse_width, 1.6), 1.4) set_esc_speed(pulse_width) print(f"受信: {message} -> ESCパルス幅を {pulse_width:.2f} msに設定")
except KeyboardInterrupt:
print("\nプログラムを終了します。")
except Exception as e:
print(f"エラーが発生しました: {e}")
finally:
# デバイスを安全な状態にシャットダウン
shutdown_devices()
sock.close()
print("デバイスを安全な状態に設定しました。")
if __name__ == "__main__":
main()
サーボモーターの制御が素早く滑らかにするためinputsライブラリにより、ジョイスティックが変化した時だけイベントを送信することで、処理を軽くしつつシステムのリアルタイム性が向上しました。
データ転送にUDPソケットを使用しました。UDPはコネクションレスプロトコルであり、コネクションの確立や状態管理が不要なため、TCPよりもオーバーヘッドが少なくデータ送受信が速く行えます。
受信側では非ブロッキングソケットを設定し、データがない場合でも処理が停止することなく、サーボモーターの位置を連続して更新できるようにしました。これにより、サーボの反応性が向上し、システム全体の反応性も良くなりました。