mirror of
https://github.com/kk7ds/chirp.git
synced 2024-09-22 02:57:20 +00:00
Move the core import logic to its own module so that we can use it for tests
This commit is contained in:
parent
98d37dce1b
commit
2d14840c4f
122
chirp/import_logic.py
Normal file
122
chirp/import_logic.py
Normal file
@ -0,0 +1,122 @@
|
||||
#!/usr/bin/python
|
||||
#
|
||||
# Copyright 2011 Dan Smith <dsmith@danplanet.com>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from chirp import chirp_common, errors
|
||||
|
||||
class ImportError(Exception):
|
||||
pass
|
||||
|
||||
class DestNotCompatible(ImportError):
|
||||
pass
|
||||
|
||||
def ensure_has_calls(radio, memory):
|
||||
ulist_changed = rlist_changed = False
|
||||
|
||||
ulist = radio.get_urcall_list()
|
||||
rlist = radio.get_repeater_call_list()
|
||||
|
||||
if memory.dv_urcall and memory.dv_urcall not in ulist:
|
||||
for i in range(0, len(ulist)):
|
||||
if not ulist[i].strip():
|
||||
ulist[i] = memory.dv_urcall
|
||||
ulist_changed = True
|
||||
break
|
||||
if not ulist_changed:
|
||||
raise ImportError("No room to add callsign %s" % memory.dv_urcall)
|
||||
|
||||
rlist_add = []
|
||||
if memory.dv_rpt1call and memory.dv_rpt1call not in rlist:
|
||||
rlist_add.append(memory.dv_rpt1call)
|
||||
if memory.dv_rpt2call and memory.dv_rpt2call not in rlist:
|
||||
rlist_add.append(memory.dv_rpt2call)
|
||||
|
||||
while rlist_add:
|
||||
call = rlist_add.pop()
|
||||
for i in range(0, len(rlist)):
|
||||
if not rlist[i].strip():
|
||||
rlist[i] = call
|
||||
call = None
|
||||
rlist_changed = True
|
||||
break
|
||||
if call:
|
||||
raise errors.RadioError("No room to add callsign %s" % call)
|
||||
|
||||
if ulist_changed:
|
||||
radio.set_urcall_list(ulist)
|
||||
if rlist_changed:
|
||||
radio.set_repeater_cal_list(rlist)
|
||||
|
||||
# Filter the name according to the destination's rules
|
||||
def _import_name(dst_radio, mem):
|
||||
mem.name = dst_radio.filter_name(mem.name)
|
||||
|
||||
# If the bank is out of range for the destination, revert to "no bank"
|
||||
def _import_bank(dst_radio, mem):
|
||||
nbanks = len(dst_radio.get_banks())
|
||||
if mem.bank >= nbanks:
|
||||
mem.bank = None
|
||||
|
||||
def _import_power(dst_radio, mem):
|
||||
levels = dst_radio.get_features().valid_power_levels
|
||||
if not levels:
|
||||
mem.power = None
|
||||
return
|
||||
elif not mem.power:
|
||||
# Source radio did not support power levels, so choose the
|
||||
# first (highest) level from the destination radio.
|
||||
mem.power = levels[0]
|
||||
return
|
||||
|
||||
# If both radios support power levels, we need to decide how to
|
||||
# convert the source power level to a valid one for the destination
|
||||
# radio. To do that, find the absolute level of the source value
|
||||
# and calculate the different between it and all the levels of the
|
||||
# destination, choosing the one that matches most closely.
|
||||
|
||||
deltas = [abs(mem.power - power) for power in levels]
|
||||
mem.power = levels[deltas.index(min(deltas))]
|
||||
|
||||
def import_mem(dst_radio, src_mem, overrides={}):
|
||||
dst_rf = dst_radio.get_features()
|
||||
|
||||
if isinstance(src_mem, chirp_common.DVMemory):
|
||||
if not isinstance(dst_radio, chirp_common.IcomDstarSupport):
|
||||
raise DestNotCompatible("Destination radio does not support D-STAR")
|
||||
if dst_rf.requires_call_lists:
|
||||
ensure_has_calls(dst_radio, src_mem)
|
||||
|
||||
dst_mem = src_mem.dupe()
|
||||
|
||||
helpers = [_import_name,
|
||||
_import_bank,
|
||||
_import_power,
|
||||
]
|
||||
|
||||
for helper in helpers:
|
||||
helper(dst_radio, dst_mem)
|
||||
|
||||
for k, v in overrides.items():
|
||||
dst_mem.__dict__[k] = v
|
||||
|
||||
msgs = dst_radio.validate_memory(dst_mem)
|
||||
errors = [x for x in msgs if isinstance(x, chirp_common.ValidationError)]
|
||||
if errors:
|
||||
raise DestNotCompatible("Unable to create import memory: %s" %\
|
||||
", ".join(errors))
|
||||
|
||||
return dst_mem
|
||||
|
@ -19,7 +19,7 @@ import gtk
|
||||
import gobject
|
||||
import pango
|
||||
|
||||
from chirp import errors, chirp_common, xml
|
||||
from chirp import errors, chirp_common, xml, import_logic
|
||||
from chirpui import common
|
||||
|
||||
class WaitWindow(gtk.Window):
|
||||
@ -220,52 +220,26 @@ class ImportDialog(gtk.Dialog):
|
||||
|
||||
def do_import(self, dst_rthread):
|
||||
i = 0
|
||||
|
||||
error_messages = {}
|
||||
|
||||
import_list = self.get_import_list()
|
||||
|
||||
has_dstar = isinstance(self.dst_radio, chirp_common.IcomDstarSupport)
|
||||
dst_features = self.dst_radio.get_features()
|
||||
src_features = self.src_radio.get_features()
|
||||
|
||||
if has_dstar and dst_features.requires_call_lists:
|
||||
self.ensure_calls(dst_rthread, import_list)
|
||||
|
||||
dst_banks = self.dst_radio.get_banks()
|
||||
|
||||
for old, new in import_list:
|
||||
i += 1
|
||||
print "%sing %i -> %i" % (self.ACTION, old, new)
|
||||
mem = self.src_radio.get_memory(old).dupe()
|
||||
mem.number = new
|
||||
|
||||
self.do_soft_conversions(dst_features, src_features, mem)
|
||||
src = self.src_radio.get_memory(old)
|
||||
|
||||
mem.name = self.dst_radio.filter_name(mem.name)
|
||||
try:
|
||||
mem = import_logic.import_mem(self.dst_radio, src,
|
||||
{"number" : new})
|
||||
except import_logic.ImportError, e:
|
||||
print e
|
||||
error_messages[new] = str(e)
|
||||
continue
|
||||
|
||||
if dst_banks and not mem.bank < len(dst_banks):
|
||||
mem.bank = None
|
||||
|
||||
if isinstance(mem, chirp_common.DVMemory):
|
||||
if mem.dv_rpt1call == "*NOTUSE*":
|
||||
mem.dv_rpt1call = ""
|
||||
if mem.dv_rpt2call == "*NOTUSE*":
|
||||
mem.dv_rpt2call = ""
|
||||
|
||||
msgs = dst_rthread.radio.validate_memory(mem)
|
||||
if msgs:
|
||||
error_messages[mem.number] = msgs
|
||||
else:
|
||||
job = common.RadioJob(None, "set_memory", mem)
|
||||
job.set_desc("Setting memory %i" % mem.number)
|
||||
dst_rthread._qsubmit(job, 0)
|
||||
|
||||
try:
|
||||
self.do_import_banks()
|
||||
except Exception, e:
|
||||
common.log_exception()
|
||||
error_messages["Banks"] = [str(e)]
|
||||
job = common.RadioJob(None, "set_memory", mem)
|
||||
job.set_desc("Setting memory %i" % mem.number)
|
||||
dst_rthread._qsubmit(job, 0)
|
||||
|
||||
if error_messages.keys():
|
||||
msg = "Error importing memories:\r\n"
|
||||
|
@ -21,7 +21,7 @@ from optparse import OptionParser
|
||||
# Assume we're running in the tests/ directory of the archive
|
||||
sys.path.insert(0, "../")
|
||||
|
||||
from chirp import chirp_common, directory, csv
|
||||
from chirp import chirp_common, directory, csv, import_logic
|
||||
|
||||
TESTS = {}
|
||||
|
||||
@ -110,6 +110,9 @@ class TestWrapper:
|
||||
self._dst.MODEL,
|
||||
self._dst.VARIANT)
|
||||
|
||||
def get_radio(self):
|
||||
return self._dst
|
||||
|
||||
class TestCase:
|
||||
def __init__(self, wrapper):
|
||||
self._wrapper = wrapper
|
||||
@ -183,17 +186,24 @@ class TestCaseCopyAll(TestCase):
|
||||
if src_mem.empty:
|
||||
continue
|
||||
|
||||
src_mem.number = dst_number
|
||||
dst_mem = src_mem.dupe()
|
||||
|
||||
if self._wrapper.do("validate_memory", dst_mem):
|
||||
continue # Returning some messages means "no"
|
||||
try:
|
||||
dst_mem = import_logic.import_mem(self._wrapper.get_radio(),
|
||||
src_mem,
|
||||
overrides={"number":dst_number})
|
||||
except import_logic.DestNotCompatible:
|
||||
continue
|
||||
except import_logic.ImportError, e:
|
||||
failures.append(TestFailedError("<%i>: Import Failed: %s" %\
|
||||
(dst_number, e)))
|
||||
continue
|
||||
except Exception, e:
|
||||
raise TestCrashError(get_tb(), e, "[Import]")
|
||||
|
||||
self._wrapper.do("set_memory", dst_mem)
|
||||
ret_mem = self._wrapper.do("get_memory", dst_number)
|
||||
|
||||
try:
|
||||
self.compare_mem(src_mem, ret_mem)
|
||||
self.compare_mem(dst_mem, ret_mem)
|
||||
except TestFailedError, e:
|
||||
failures.append(TestFailedError("<%i>: %s" % (number, e), e.get_detail()))
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user