# SAMPLE schema for ADI Pluto SDR. # Place into ~/.config/remoterf/drivers/pluto_schema.py if you want to # customize or override the built-in Pluto schema. from remoteRF_server.common.idl import DeviceSchema, idl_expose, idl_register import adi import re import subprocess def connect_pluto(*, serial: str): serial = (serial or "").strip() if not serial: print("A Pluto serial must be provided") return None try: out = subprocess.check_output( ["iio_info", "-s"], text=True, stderr=subprocess.STDOUT, ) usb = None for line in out.splitlines(): if (f"serial={serial}" in line) or (f"hw_serial={serial}" in line): match = re.search(r"\[usb:([^\]]+)\]", line) if match: usb = match.group(1).strip() break if not usb: print(f"No device found with serial {serial}") return None dev = adi.Pluto(f"usb:{usb}") print(f"Connected to Pluto serial={serial} via usb:{usb}") return dev except Exception as exc: print(f"Failed to connect to Pluto serial={serial}: {exc}") return None @idl_register("pluto") class PlutoSchema(DeviceSchema): device_type = "pluto" driver_version = "0.0.1" @staticmethod def make_device(**kwargs): return connect_pluto(serial=kwargs.get("serial")) @idl_expose(kind="get") def get_sample_rate(self): return self.device.sample_rate @idl_expose(kind="set") def set_sample_rate(self, value): self.device.sample_rate = value @idl_expose(kind="get") def get_rx_lo(self): return self.device.rx_lo @idl_expose(kind="set") def set_rx_lo(self, value): self.device.rx_lo = value @idl_expose(kind="get") def get_tx_lo(self): return self.device.tx_lo @idl_expose(kind="set") def set_tx_lo(self, value): self.device.tx_lo = value @idl_expose(kind="call") def call_rx(self): return self.device.rx() @idl_expose(kind="call") def call_tx(self, value): self.device.tx(value) @idl_expose(kind="call") def call_ip(self): try: return str(self.device.uri) except AttributeError: return str(self.device._uri_auto) @idl_expose(kind="get") def get_repr(self): return repr(self.device)