Adds support for a raw-mode Icom driver. This is required for the IC-2730A (part

of #2745).
This commit is contained in:
Rhett Robinson 2018-01-14 15:02:28 -08:00
parent c4aa46e654
commit 018494b7d9

View File

@ -85,6 +85,7 @@ class RadioStream:
def _process_frames(self): def _process_frames(self):
if not self.data.startswith("\xFE\xFE"): if not self.data.startswith("\xFE\xFE"):
LOG.error("Out of sync with radio:\n%s" % util.hexprint(self.data))
raise errors.InvalidDataError("Out of sync with radio") raise errors.InvalidDataError("Out of sync with radio")
elif len(self.data) < 5: elif len(self.data) < 5:
return [] # Not enough data for a full frame return [] # Not enough data for a full frame
@ -134,11 +135,11 @@ class RadioStream:
return self._process_frames() return self._process_frames()
def get_model_data(pipe, mdata="\x00\x00\x00\x00"): def get_model_data(radio, mdata="\x00\x00\x00\x00"):
"""Query the radio connected to @pipe for its model data""" """Query the @radio for its model data"""
send_clone_frame(pipe, 0xe0, mdata, raw=True) send_clone_frame(radio, 0xe0, mdata)
stream = RadioStream(pipe) stream = RadioStream(radio.pipe)
frames = stream.get_frames() frames = stream.get_frames()
if len(frames) != 1: if len(frames) != 1:
@ -164,27 +165,11 @@ def get_clone_resp(pipe, length=None):
return resp return resp
def send_clone_frame(pipe, cmd, data, raw=False, checksum=False): def send_clone_frame(radio, cmd, data, checksum=False):
"""Send a clone frame with @cmd and @data to the radio attached """Send a clone frame with @cmd and @data to the @radio"""
to @pipe""" payload = radio.get_payload(data, checksum)
cs = 0
if raw: frame = "\xfe\xfe\xee\xef%s%s\xfd" % (chr(cmd), payload)
hed = data
else:
hed = ""
for byte in data:
val = ord(byte)
hed += "%02X" % val
cs += val
if checksum:
cs = ((cs ^ 0xFFFF) + 1) & 0xFF
cs = "%02X" % cs
else:
cs = ""
frame = "\xfe\xfe\xee\xef%s%s%s\xfd" % (chr(cmd), hed, cs)
if SAVE_PIPE: if SAVE_PIPE:
LOG.debug("Saving data...") LOG.debug("Saving data...")
@ -197,30 +182,14 @@ def send_clone_frame(pipe, cmd, data, raw=False, checksum=False):
# return frame # return frame
pass pass
pipe.write(frame) radio.pipe.write(frame)
return frame return frame
def process_bcd(bcddata): def process_data_frame(radio, frame, _mmap):
"""Convert BCD-encoded data to raw"""
data = ""
i = 0
while i < range(len(bcddata)) and i+1 < len(bcddata):
try:
val = int("%s%s" % (bcddata[i], bcddata[i+1]), 16)
i += 2
data += struct.pack("B", val)
except ValueError, e:
LOG.error("Failed to parse byte: %s" % e)
break
return data
def process_data_frame(frame, _mmap):
"""Process a data frame, adding the payload to @_mmap""" """Process a data frame, adding the payload to @_mmap"""
_data = process_bcd(frame.payload) _data = radio.process_frame_payload(frame.payload)
if len(_mmap) >= 0x10000: if len(_mmap) >= 0x10000:
saddr, = struct.unpack(">I", _data[0:4]) saddr, = struct.unpack(">I", _data[0:4])
length, = struct.unpack("B", _data[4]) length, = struct.unpack("B", _data[4])
@ -264,7 +233,7 @@ def start_hispeed_clone(radio, cmd):
def _clone_from_radio(radio): def _clone_from_radio(radio):
md = get_model_data(radio.pipe) md = get_model_data(radio)
if md[0:4] != radio.get_model(): if md[0:4] != radio.get_model():
LOG.info("This model: %s" % util.hexprint(md[0:4])) LOG.info("This model: %s" % util.hexprint(md[0:4]))
@ -274,8 +243,7 @@ def _clone_from_radio(radio):
if radio.is_hispeed(): if radio.is_hispeed():
start_hispeed_clone(radio, CMD_CLONE_OUT) start_hispeed_clone(radio, CMD_CLONE_OUT)
else: else:
send_clone_frame(radio.pipe, CMD_CLONE_OUT, send_clone_frame(radio, CMD_CLONE_OUT, radio.get_model())
radio.get_model(), raw=True)
LOG.debug("Sent clone frame") LOG.debug("Sent clone frame")
@ -291,7 +259,7 @@ def _clone_from_radio(radio):
for frame in frames: for frame in frames:
if frame.cmd == CMD_CLONE_DAT: if frame.cmd == CMD_CLONE_DAT:
src, dst = process_data_frame(frame, _mmap) src, dst = process_data_frame(radio, frame, _mmap)
if last_size != (dst - src): if last_size != (dst - src):
LOG.debug("ICF Size change from %i to %i at %04x" % LOG.debug("ICF Size change from %i to %i at %04x" %
(last_size, dst - src, src)) (last_size, dst - src, src))
@ -342,7 +310,7 @@ def send_mem_chunk(radio, start, stop, bs=32):
chunk = struct.pack(">HB", i, size) chunk = struct.pack(">HB", i, size)
chunk += _mmap[i:i+size] chunk += _mmap[i:i+size]
send_clone_frame(radio.pipe, send_clone_frame(radio,
CMD_CLONE_DAT, CMD_CLONE_DAT,
chunk, chunk,
checksum=True) checksum=True)
@ -360,22 +328,22 @@ def _clone_to_radio(radio):
# Uncomment to save out a capture of what we actually write to the radio # Uncomment to save out a capture of what we actually write to the radio
# SAVE_PIPE = file("pipe_capture.log", "w", 0) # SAVE_PIPE = file("pipe_capture.log", "w", 0)
md = get_model_data(radio.pipe) md = get_model_data(radio)
if md[0:4] != radio.get_model(): if md[0:4] != radio.get_model():
raise errors.RadioError("I can't talk to this model") raise errors.RadioError("I can't talk to this model")
# This mimics what the Icom software does, but isn't required and just # This mimics what the Icom software does, but isn't required and just
# takes longer # takes longer
# md = get_model_data(radio.pipe, model=md[0:2]+"\x00\x00") # md = get_model_data(radio, mdata=md[0:2]+"\x00\x00")
# md = get_model_data(radio.pipe, model=md[0:2]+"\x00\x00") # md = get_model_data(radio, mdata=md[0:2]+"\x00\x00")
stream = RadioStream(radio.pipe) stream = RadioStream(radio.pipe)
if radio.is_hispeed(): if radio.is_hispeed():
start_hispeed_clone(radio, CMD_CLONE_IN) start_hispeed_clone(radio, CMD_CLONE_IN)
else: else:
send_clone_frame(radio.pipe, CMD_CLONE_IN, radio.get_model(), raw=True) send_clone_frame(radio, CMD_CLONE_IN, radio.get_model())
frames = [] frames = []
@ -384,7 +352,7 @@ def _clone_to_radio(radio):
break break
frames += stream.get_frames() frames += stream.get_frames()
send_clone_frame(radio.pipe, CMD_CLONE_END, radio.get_endframe(), raw=True) send_clone_frame(radio, CMD_CLONE_END, radio.get_endframe())
if SAVE_PIPE: if SAVE_PIPE:
SAVE_PIPE.close() SAVE_PIPE.close()
@ -409,6 +377,7 @@ def clone_to_radio(radio):
try: try:
return _clone_to_radio(radio) return _clone_to_radio(radio)
except Exception, e: except Exception, e:
logging.exception("Failed to communicate with the radio")
raise errors.RadioError("Failed to communicate with the radio: %s" % e) raise errors.RadioError("Failed to communicate with the radio: %s" % e)
@ -579,6 +548,13 @@ class IcomIndexedBankModel(IcomBankModel,
raise errors.RadioError("Out of slots in this bank") raise errors.RadioError("Out of slots in this bank")
def compute_checksum(data):
cs = 0
for byte in data:
cs += ord(byte)
return ((cs ^ 0xFFFF) + 1) & 0xFF
class IcomCloneModeRadio(chirp_common.CloneModeRadio): class IcomCloneModeRadio(chirp_common.CloneModeRadio):
"""Base class for Icom clone-mode radios""" """Base class for Icom clone-mode radios"""
VENDOR = "Icom" VENDOR = "Icom"
@ -612,6 +588,31 @@ class IcomCloneModeRadio(chirp_common.CloneModeRadio):
"""Returns the ranges this radio likes to have in a clone""" """Returns the ranges this radio likes to have in a clone"""
return cls._ranges return cls._ranges
def process_frame_payload(self, payload):
"""Convert BCD-encoded data to raw"""
bcddata = payload
data = ""
i = 0
while i+1 < len(bcddata):
try:
val = int("%s%s" % (bcddata[i], bcddata[i+1]), 16)
i += 2
data += struct.pack("B", val)
except ValueError, e:
LOG.error("Failed to parse byte: %s" % e)
break
return data
def get_payload(self, data, checksum):
"""Returns the data with optional checksum BCD-encoded for the radio."""
payload = ""
for byte in data:
payload += "%02X" % ord(byte)
if checksum:
payload += "%02X" % compute_checksum(data)
return payload
def sync_in(self): def sync_in(self):
self._mmap = clone_from_radio(self) self._mmap = clone_from_radio(self)
self.process_mmap() self.process_mmap()
@ -646,6 +647,67 @@ class IcomCloneModeRadio(chirp_common.CloneModeRadio):
return honor_speed_switch_setting(self, settings) return honor_speed_switch_setting(self, settings)
def flip_high_order_bit(data):
return [chr(ord(d) ^ 0x80) for d in list(data)]
def escape_raw_byte(byte):
"""Escapes a raw byte for sending to the radio"""
# Certain bytes are used as control characters to the radio, so if one of
# these bytes is present in the stream to the radio, it gets escaped as
# 0xff followed by (byte & 0x0f)
if ord(byte) > 0xf9:
return "\xff%s" % (chr(ord(byte) & 0xf))
return byte
def unescape_raw_bytes(escaped_data):
"""Unescapes raw bytes from the radio."""
data = ""
i = 0
while i < len(escaped_data):
byte = escaped_data[i]
if byte == '\xff':
if i + 1 >= len(escaped_data):
raise errors.InvalidDataError(
"Unexpected escape character at end of data")
i += 1
byte = chr(0xf0 | ord(escaped_data[i]))
data += byte
i += 1
return data
class IcomRawCloneModeRadio(IcomCloneModeRadio):
"""Subclass for Icom clone-mode radios using the raw data protocol."""
def process_frame_payload(self, payload):
"""Payloads from a raw-clone-mode radio are already in raw format."""
return unescape_raw_bytes(payload)
def get_payload(self, data, checksum):
"""Returns the data with optional checksum in raw format."""
if checksum:
cs = chr(compute_checksum(data))
else:
cs = ""
payload = "%s%s" % (data, cs)
# Escape control characters.
escaped_payload = [escape_raw_byte(b) for b in payload]
return "".join(escaped_payload)
def sync_in(self):
# The radio returns all the bytes with the high-order bit flipped.
_mmap = clone_from_radio(self)
_mmap = flip_high_order_bit(_mmap.get_packed())
self._mmap = memmap.MemoryMap(_mmap)
self.process_mmap()
def get_mmap(self):
_data = flip_high_order_bit(self._mmap.get_packed())
return memmap.MemoryMap(_data)
class IcomLiveRadio(chirp_common.LiveRadio): class IcomLiveRadio(chirp_common.LiveRadio):
"""Base class for an Icom Live-mode radio""" """Base class for an Icom Live-mode radio"""
VENDOR = "Icom" VENDOR = "Icom"