add some functions

This commit is contained in:
F4HTB 2020-09-27 04:40:13 +01:00
parent 6d63fa1ff0
commit 16f8c6bb33
23 changed files with 1973 additions and 69 deletions

117
UHRR
View File

@ -12,6 +12,8 @@ import time
import numpy
import gc
import Hamlib
from opus.decoder import Decoder as OpusDecoder
import datetime
############ Global variables ##################################
CTRX=None
@ -72,11 +74,50 @@ class AudioRXHandler(tornado.websocket.WebSocketHandler):
self.Wavframes = []
gc.collect()
############ websocket for control TX ##############
class AudioTXHandler(tornado.websocket.WebSocketHandler):
def TX_init(self, msg) :
itrate, is_encoded, op_rate, op_frm_dur = [int(i) for i in msg.split(',')]
self.is_encoded = is_encoded
self.decoder = OpusDecoder(op_rate, 1)
self.frame_size = op_frm_dur * op_rate
device = 'plughw:CARD=U0x41e0x30d3,DEV=0'
self.inp = alsaaudio.PCM(alsaaudio.PCM_PLAYBACK, alsaaudio.PCM_NONBLOCK, channels=1, rate=itrate, format=alsaaudio.PCM_FORMAT_S16_LE, periodsize=2048, device=device)
def open(self):
print('new connection on AudioTXHandler socket.')
def on_message(self, data) :
if str(data).startswith('m:') :
self.TX_init(str(data[2:]))
elif str(data).startswith('s:') :
self.inp.close()
else :
if self.is_encoded :
pcm = self.decoder.decode(data, self.frame_size, False)
self.inp.write(pcm)
gc.collect()
else :
self.inp.write(data)
gc.collect()
def on_close(self):
if(hasattr(self,"inp")):
self.inp.close()
print('connection closed for TX socket')
############ websocket for control TRX ##############
ControlTRXHandlerClients = []
class TRXRIG:
def __init__(self):
self.spoints = {"0":-54, "1":-48, "2":-42, "3":-36, "4":-30, "5":-24, "6":-18, "7":-12, "8":-6, "9":0, "10":10, "20":20, "30":30, "40":40, "50":50, "60":60}
self.infos = {}
self.serialport = Hamlib.hamlib_port_parm_serial
self.serialport.rate=38400
@ -94,6 +135,13 @@ class TRXRIG:
self.getFreq()
self.getMode()
def parsedbtospoint(self,spoint):
for key, value in self.spoints.items():
if (spoint<value):
return key
break
def getvfo(self):
try:
self.infos["VFO"] = (self.rig.get_vfo())
@ -125,18 +173,6 @@ class TRXRIG:
logging.error("Could not set the mode via Hamlib!")
return self.infos["MODE"]
def setPTT(self,status):
try:
if status:
self.rig.set_ptt(Hamlib.RIG_VFO_CURR,Hamlib.RIG_PTT_ON)
self.infos["PTT"]="PTT_OFF"
else:
self.rig.set_ptt(Hamlib.RIG_VFO_CURR,Hamlib.RIG_PTT_OFF)
self.infos["PTT"]="PTT_ON"
except:
logging.error("Could not set the mode via Hamlib!")
return self.infos["PTT"]
def getMode(self):
try:
(mode, width) = self.rig.get_mode()
@ -148,11 +184,27 @@ class TRXRIG:
def getStrgLVL(self):
try:
self.infos["StrgLVL"] = self.rig.get_level_i(Hamlib.RIG_LEVEL_STRENGTH)
self.infos["StrgLVLi"] = self.rig.get_level_i(Hamlib.RIG_LEVEL_STRENGTH)
self.infos["StrgLVL"] = self.parsedbtospoint(self.infos["StrgLVLi"])
except:
logging.error("Could not obtain the current Strength signal RX level via Hamlib!")
return self.infos["StrgLVL"]
def setPTT(self,status):
try:
if status == "true":
self.rig.set_ptt(Hamlib.RIG_VFO_CURR,Hamlib.RIG_PTT_ON)
self.infos["PTT"]=True
else:
self.rig.set_ptt(Hamlib.RIG_VFO_CURR,Hamlib.RIG_PTT_OFF)
self.infos["PTT"]=False
except:
logging.error("Could not set the mode via Hamlib!")
return self.infos["PTT"]
def getPTT(self,status):
return self.infos["PTT"]
def setPower(self,status=1):
try:
if status:
@ -164,10 +216,33 @@ class TRXRIG:
logging.error("Could not set power status via Hamlib!")
return self.infos["powerstat"]
class ticksTRXRIG(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
while True:
CTRX.getStrgLVL()
time.sleep(0.5)
class ControlTRX(tornado.websocket.WebSocketHandler):
def sendPTINFOS(self):
try:
if self.StrgLVL != CTRX.infos["StrgLVL"]:
self.write_message("getSignalLevel:"+str(CTRX.infos["StrgLVL"]))
self.StrgLVL=CTRX.infos["StrgLVL"]
except:
print("error TXMETER")
return None
tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(seconds=0.5), self.sendPTINFOS)
def open(self):
if self not in ControlTRXHandlerClients:
ControlTRXHandlerClients.append(self)
self.StrgLVL=0
self.sendPTINFOS()
print('new connection on ControlTRX socket.')
@tornado.gen.coroutine
@ -186,6 +261,12 @@ class ControlTRX(tornado.websocket.WebSocketHandler):
yield self.write_message("getFreq:"+str(CTRX.getFreq())) #CTRX()[data]()
elif(action == "setFreq"):
yield self.write_message("getFreq:"+str(CTRX.setFreq(datato)))
elif(action == "getMode"):
yield self.write_message("getMode:"+str(CTRX.getMode()))
elif(action == "setMode"):
yield self.write_message("getMode:"+str(CTRX.setMode(datato)))
elif(action == "setPTT"):
yield self.write_message("getPTT:"+str(CTRX.setPTT(datato)))
def on_close(self):
@ -204,16 +285,20 @@ if __name__ == "__main__":
threadloadWavdata = loadWavdata()
threadloadWavdata.start()
CTRX = TRXRIG()
CTRX.setPower(1)
threadticksTRXRIG = ticksTRXRIG()
threadticksTRXRIG.start()
app = tornado.web.Application([
(r'/CTRX', ControlTRX),
(r'/audioRX', AudioRXHandler),
(r'/audioTX', AudioTXHandler),
(r'/CTRX', ControlTRX),
(r'/', MainHandler),
(r'/(.*)', tornado.web.StaticFileHandler, { 'path' : './www' })
],debug=True)
],debug=True, websocket_ping_interval=10)
http_server = tornado.httpserver.HTTPServer(app, ssl_options={
"certfile": os.path.join("selfsign.crt"),

24
opus/LICENSE Normal file
View File

@ -0,0 +1,24 @@
Copyright (c) 2012, SvartalF
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of the SvartalF nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

1
opus/__init__.py Normal file
View File

@ -0,0 +1 @@
"""Python bindings to the libopus, IETF low-delay audio codec"""

Binary file not shown.

Binary file not shown.

Binary file not shown.

9
opus/api/__init__.py Normal file
View File

@ -0,0 +1,9 @@
import ctypes
from ctypes.util import find_library
libopus = ctypes.CDLL(find_library('opus'))
c_int_pointer = ctypes.POINTER(ctypes.c_int)
c_int16_pointer = ctypes.POINTER(ctypes.c_int16)
c_float_pointer = ctypes.POINTER(ctypes.c_float)

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

71
opus/api/constants.py Normal file
View File

@ -0,0 +1,71 @@
# -*- coding: utf-8 -*-
"""Matches to `opus_defines.h`"""
# No Error
OK = 0
# One or more invalid/out of range arguments
BAD_ARG = -1
# The mode struct passed is invalid
BUFFER_TOO_SMALL = -2
# The compressed data passed is corrupted
INVALID_PACKET = -4
# Invalid/unsupported request number
UNIMPLEMENTED = -5
# Pre-defined values for CTL interface
APPLICATION_VOIP = 2048
APPLICATION_AUDIO = 2049
APPLICATION_RESTRICTED_LOWDELAY = 2051
SIGNAL_MUSIC = 3002
# Values for the various encoder CTLs
SET_APPLICATION_REQUEST = 4000
GET_APPLICATION_REQUEST = 4001
SET_BITRATE_REQUEST = 4002
GET_BITRATE_REQUEST = 4003
SET_MAX_BANDWIDTH_REQUEST = 4004
GET_MAX_BANDWIDTH_REQUEST = 4005
SET_VBR_REQUEST = 4006
GET_VBR_REQUEST = 4007
SET_BANDWIDTH_REQUEST = 4008
GET_BANDWIDTH_REQUEST = 4009
SET_COMPLEXITY_REQUEST = 4010
GET_COMPLEXITY_REQUEST = 4011
SET_INBAND_FEC_REQUEST = 4012
GET_INBAND_FEC_REQUEST = 4013
SET_PACKET_LOSS_PERC_REQUEST = 4014
GET_PACKET_LOSS_PERC_REQUEST = 4015
SET_DTX_REQUEST = 4016
GET_DTX_REQUEST = 4017
SET_VBR_CONSTRAINT_REQUEST = 4020
GET_VBR_CONSTRAINT_REQUEST = 4021
SET_FORCE_CHANNELS_REQUEST = 4022
GET_FORCE_CHANNELS_REQUEST = 4023
SET_SIGNAL_REQUEST = 4024
GET_SIGNAL_REQUEST = 4025
GET_LOOKAHEAD_REQUEST = 4027
RESET_STATE = 4028
GET_SAMPLE_RATE_REQUEST = 4029
GET_FINAL_RANGE_REQUEST = 4031
GET_PITCH_REQUEST = 4033
SET_GAIN_REQUEST = 4034
GET_GAIN_REQUEST = 4045
SET_LSB_DEPTH_REQUEST = 4036
GET_LSB_DEPTH_REQUEST = 4037
AUTO = -1000
BANDWIDTH_NARROWBAND = 1101
BANDWIDTH_MEDIUMBAND = 1102
BANDWIDTH_WIDEBAND = 1103
BANDWIDTH_SUPERWIDEBAND = 1104
BANDWIDTH_FULLBAND = 1105

173
opus/api/ctl.py Normal file
View File

@ -0,0 +1,173 @@
"""CTL macros rewritten to Python
Usage example:
from opus.api import decoder, ctl
dec = decoder.create(48000, 2)
decoder.ctl(dec, ctl.set_gain, -15)
gain_value = decoder.ctl(dec, ctl.get_gain)
"""
import ctypes
from opus.api import constants
from opus.exceptions import OpusError
def query(request):
"""Query encoder/decoder with a request value"""
def inner(func, obj):
result_code = func(obj, request)
if result_code is not constants.OK:
raise OpusError(result_code)
return result_code
return inner
def get(request, result_type):
"""Get CTL value from a encoder/decoder"""
def inner(func, obj):
result = result_type()
result_code = func(obj, request, ctypes.byref(result))
if result_code is not constants.OK:
raise OpusError(result_code)
return result.value
return inner
def set(request):
"""Set new CTL value to a encoder/decoder"""
def inner(func, obj, value):
result_code = func(obj, request, value)
if result_code is not constants.OK:
raise OpusError(result_code)
return inner
#
# Generic CTLs
#
# Resets the codec state to be equivalent to a freshly initialized state
reset_state = query(constants.RESET_STATE)
# Gets the final state of the codec's entropy coder
get_final_range = get(constants.GET_FINAL_RANGE_REQUEST, ctypes.c_uint)
# Gets the encoder's configured bandpass or the decoder's last bandpass
get_bandwidth = get(constants.GET_BANDWIDTH_REQUEST, ctypes.c_int)
# Gets the pitch of the last decoded frame, if available
get_pitch = get(constants.GET_PITCH_REQUEST, ctypes.c_int)
# Configures the depth of signal being encoded
set_lsb_depth = set(constants.SET_LSB_DEPTH_REQUEST)
# Gets the encoder's configured signal depth
get_lsb_depth = get(constants.GET_LSB_DEPTH_REQUEST, ctypes.c_int)
#
# Decoder related CTLs
#
# Gets the decoder's configured gain adjustment
get_gain = get(constants.GET_GAIN_REQUEST, ctypes.c_int)
# Configures decoder gain adjustment
set_gain = set(constants.SET_GAIN_REQUEST)
#
# Encoder related CTLs
#
# Configures the encoder's computational complexity
set_complexity = set(constants.SET_COMPLEXITY_REQUEST)
# Gets the encoder's complexity configuration
get_complexity = get(constants.GET_COMPLEXITY_REQUEST, ctypes.c_int)
# Configures the bitrate in the encoder
set_bitrate = set(constants.SET_BITRATE_REQUEST)
# Gets the encoder's bitrate configuration
get_bitrate = get(constants.GET_BITRATE_REQUEST, ctypes.c_int)
# Enables or disables variable bitrate (VBR) in the encoder
set_vbr = set(constants.SET_VBR_REQUEST)
# Determine if variable bitrate (VBR) is enabled in the encoder
get_vbr = get(constants.GET_VBR_REQUEST, ctypes.c_int)
# Enables or disables constrained VBR in the encoder
set_vbr_constraint = set(constants.SET_VBR_CONSTRAINT_REQUEST)
# Determine if constrained VBR is enabled in the encoder
get_vbr_constraint = get(constants.GET_VBR_CONSTRAINT_REQUEST, ctypes.c_int)
# Configures mono/stereo forcing in the encoder
set_force_channels = set(constants.SET_FORCE_CHANNELS_REQUEST)
# Gets the encoder's forced channel configuration
get_force_channels = get(constants.GET_FORCE_CHANNELS_REQUEST, ctypes.c_int)
# Configures the maximum bandpass that the encoder will select automatically
set_max_bandwidth = set(constants.SET_MAX_BANDWIDTH_REQUEST)
# Gets the encoder's configured maximum allowed bandpass
get_max_bandwidth = get(constants.GET_MAX_BANDWIDTH_REQUEST, ctypes.c_int)
# Sets the encoder's bandpass to a specific value
set_bandwidth = set(constants.SET_BANDWIDTH_REQUEST)
# Configures the type of signal being encoded
set_signal = set(constants.SET_SIGNAL_REQUEST)
# Gets the encoder's configured signal type
get_signal = get(constants.GET_SIGNAL_REQUEST, ctypes.c_int)
# Configures the encoder's intended application
set_application = set(constants.SET_APPLICATION_REQUEST)
# Gets the encoder's configured application
get_application = get(constants.GET_APPLICATION_REQUEST, ctypes.c_int)
# Gets the sampling rate the encoder or decoder was initialized with
get_sample_rate = get(constants.GET_SAMPLE_RATE_REQUEST, ctypes.c_int)
# Gets the total samples of delay added by the entire codec
get_lookahead = get(constants.GET_LOOKAHEAD_REQUEST, ctypes.c_int)
# Configures the encoder's use of inband forward error correction (FEC)
set_inband_fec = set(constants.SET_INBAND_FEC_REQUEST)
# Gets encoder's configured use of inband forward error correction
get_inband_fec = get(constants.GET_INBAND_FEC_REQUEST, ctypes.c_int)
# Configures the encoder's expected packet loss percentage
set_packet_loss_perc = set(constants.SET_PACKET_LOSS_PERC_REQUEST)
# Gets the encoder's configured packet loss percentage
get_packet_loss_perc = get(constants.GET_PACKET_LOSS_PERC_REQUEST, ctypes.c_int)
# Configures the encoder's use of discontinuous transmission (DTX)
set_dtx = set(constants.SET_DTX_REQUEST)
# Gets encoder's configured use of discontinuous transmission
get_dtx = get(constants.GET_DTX_REQUEST, ctypes.c_int)
#
# Other stuff
#
unimplemented = query(constants.UNIMPLEMENTED)

187
opus/api/decoder.py Normal file
View File

@ -0,0 +1,187 @@
# -*- coding: utf-8 -*-
import array
import ctypes
from opus.api import libopus, c_int_pointer, c_int16_pointer, c_float_pointer
from opus.exceptions import OpusError
class Decoder(ctypes.Structure):
"""Opus decoder state.
This contains the complete state of an Opus decoder.
"""
pass
DecoderPointer = ctypes.POINTER(Decoder)
get_size = libopus.opus_decoder_get_size
get_size.argtypes = (ctypes.c_int,)
get_size.restype = ctypes.c_int
get_size.__doc__ = 'Gets the size of an OpusDecoder structure'
_create = libopus.opus_decoder_create
_create.argtypes = (ctypes.c_int, ctypes.c_int, c_int_pointer)
_create.restype = DecoderPointer
def create(fs, channels):
"""Allocates and initializes a decoder state"""
result_code = ctypes.c_int()
result = _create(fs, channels, ctypes.byref(result_code))
if result_code.value is not 0:
raise OpusError(result_code.value)
return result
_packet_get_bandwidth = libopus.opus_packet_get_bandwidth
_packet_get_bandwidth.argtypes = (ctypes.c_char_p,)
_packet_get_bandwidth.restype = ctypes.c_int
def packet_get_bandwidth(data):
"""Gets the bandwidth of an Opus packet."""
data_pointer = ctypes.c_char_p(data)
result = _packet_get_bandwidth(data_pointer)
if result < 0:
raise OpusError(result)
return result
_packet_get_nb_channels = libopus.opus_packet_get_nb_channels
_packet_get_nb_channels.argtypes = (ctypes.c_char_p,)
_packet_get_nb_channels.restype = ctypes.c_int
def packet_get_nb_channels(data):
"""Gets the number of channels from an Opus packet"""
data_pointer = ctypes.c_char_p(data)
result = _packet_get_nb_channels(data_pointer)
if result < 0:
raise OpusError(result)
return result
_packet_get_nb_frames = libopus.opus_packet_get_nb_frames
_packet_get_nb_frames.argtypes = (ctypes.c_char_p, ctypes.c_int)
_packet_get_nb_frames.restype = ctypes.c_int
def packet_get_nb_frames(data, length=None):
"""Gets the number of frames in an Opus packet"""
data_pointer = ctypes.c_char_p(data)
if length is None:
length = len(data)
result = _packet_get_nb_frames(data_pointer, ctypes.c_int(length))
if result < 0:
raise OpusError(result)
return result
_packet_get_samples_per_frame = libopus.opus_packet_get_samples_per_frame
_packet_get_samples_per_frame.argtypes = (ctypes.c_char_p, ctypes.c_int)
_packet_get_samples_per_frame.restype = ctypes.c_int
def packet_get_samples_per_frame(data, fs):
"""Gets the number of samples per frame from an Opus packet"""
data_pointer = ctypes.c_char_p(data)
result = _packet_get_nb_frames(data_pointer, ctypes.c_int(fs))
if result < 0:
raise OpusError(result)
return result
_get_nb_samples = libopus.opus_decoder_get_nb_samples
_get_nb_samples.argtypes = (DecoderPointer, ctypes.c_char_p, ctypes.c_int32)
_get_nb_samples.restype = ctypes.c_int
def get_nb_samples(decoder, packet, length):
result = _get_nb_samples(decoder, packet, length)
if result < 0:
raise OpusError(result)
return result
_decode = libopus.opus_decode
_decode.argtypes = (DecoderPointer, ctypes.c_char_p, ctypes.c_int32, c_int16_pointer, ctypes.c_int, ctypes.c_int)
_decode.restype = ctypes.c_int
def decode(decoder, data, length, frame_size, decode_fec, channels=2):
"""Decode an Opus frame
Unlike the `opus_decode` function , this function takes an additional parameter `channels`,
which indicates the number of channels in the frame
"""
pcm_size = frame_size * channels * ctypes.sizeof(ctypes.c_int16)
pcm = (ctypes.c_int16 * pcm_size)()
pcm_pointer = ctypes.cast(pcm, c_int16_pointer)
# Converting from a boolean to int
decode_fec = int(bool(decode_fec))
result = _decode(decoder, data, length, pcm_pointer, frame_size, decode_fec)
if result < 0:
raise OpusError(result)
return array.array('h', pcm[ :result * channels ]).tostring()
_decode_float = libopus.opus_decode_float
_decode_float.argtypes = (DecoderPointer, ctypes.c_char_p, ctypes.c_int32, c_float_pointer, ctypes.c_int, ctypes.c_int)
_decode_float.restype = ctypes.c_int
def decode_float(decoder, data, length, frame_size, decode_fec, channels=2):
pcm_size = frame_size * channels * ctypes.sizeof(ctypes.c_float)
pcm = (ctypes.c_float * pcm_size)()
pcm_pointer = ctypes.cast(pcm, c_float_pointer)
# Converting from a boolean to int
decode_fec = int(bool(decode_fec))
result = _decode_float(decoder, data, length, pcm_pointer, frame_size, decode_fec)
if result < 0:
raise OpusError(result)
return array.array('f', pcm[ : result * channels ]).tostring()
_ctl = libopus.opus_decoder_ctl
_ctl.restype = ctypes.c_int
def ctl(decoder, request, value=None):
if value is not None:
return request(_ctl, decoder, value)
return request(_ctl, decoder)
destroy = libopus.opus_decoder_destroy
destroy.argtypes = (DecoderPointer,)
destroy.restype = None
destroy.__doc__ = 'Frees an OpusDecoder allocated by opus_decoder_create()'

105
opus/api/encoder.py Normal file
View File

@ -0,0 +1,105 @@
# -*- coding: utf-8 -*-
import ctypes
import array
from opus.api import constants, libopus, c_int_pointer, c_int16_pointer, c_float_pointer
from opus.exceptions import OpusError
class Encoder(ctypes.Structure):
"""Opus encoder state.
This contains the complete state of an Opus encoder.
"""
pass
EncoderPointer = ctypes.POINTER(Encoder)
_get_size = libopus.opus_encoder_get_size
_get_size.argtypes = (ctypes.c_int,)
_get_size.restype = ctypes.c_int
def get_size(channels):
"""Gets the size of an OpusEncoder structure."""
if not channels in (1, 2):
raise ValueError('Wrong channels value. Must be equal to 1 or 2')
return _get_size(channels)
_create = libopus.opus_encoder_create
_create.argtypes = (ctypes.c_int, ctypes.c_int, ctypes.c_int, c_int_pointer)
_create.restype = EncoderPointer
def create(fs, channels, application):
"""Allocates and initializes an encoder state."""
result_code = ctypes.c_int()
result = _create(fs, channels, application, ctypes.byref(result_code))
if result_code.value is not constants.OK:
raise OpusError(result_code.value)
return result
_ctl = libopus.opus_encoder_ctl
_ctl.restype = ctypes.c_int
def ctl(encoder, request, value=None):
if value is not None:
return request(_ctl, encoder, value)
return request(_ctl, encoder)
_encode = libopus.opus_encode
_encode.argtypes = (EncoderPointer, c_int16_pointer, ctypes.c_int, ctypes.c_char_p, ctypes.c_int32)
_encode.restype = ctypes.c_int32
def encode(encoder, pcm, frame_size, max_data_bytes):
"""Encodes an Opus frame
Returns string output payload
"""
pcm = ctypes.cast(pcm, c_int16_pointer)
data = (ctypes.c_char * max_data_bytes)()
result = _encode(encoder, pcm, frame_size, data, max_data_bytes)
if result < 0:
raise OpusError(result)
return array.array('c', data[:result]).tostring()
_encode_float = libopus.opus_encode_float
_encode_float.argtypes = (EncoderPointer, c_float_pointer, ctypes.c_int, ctypes.c_char_p, ctypes.c_int32)
_encode_float.restype = ctypes.c_int32
def encode_float(encoder, pcm, frame_size, max_data_bytes):
"""Encodes an Opus frame from floating point input"""
pcm = ctypes.cast(pcm, c_float_pointer)
data = (ctypes.c_char * max_data_bytes)()
result = _encode_float(encoder, pcm, frame_size, data, max_data_bytes)
if result < 0:
raise OpusError(result)
return array.array('c', data[:result]).tostring()
destroy = libopus.opus_encoder_destroy
destroy.argtypes = (EncoderPointer,)
destroy.restype = None
destroy.__doc__ = "Frees an OpusEncoder allocated by opus_encoder_create()"

17
opus/api/info.py Normal file
View File

@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
import ctypes
from opus.api import libopus
strerror = libopus.opus_strerror
strerror.argtypes = (ctypes.c_int,)
strerror.restype = ctypes.c_char_p
strerror.__doc__ = '''Converts an opus error code into a human readable string'''
get_version_string = libopus.opus_get_version_string
get_version_string.argtypes = None
get_version_string.restype = ctypes.c_char_p
get_version_string.__doc__ = 'Gets the libopus version string'

59
opus/decoder.py Normal file
View File

@ -0,0 +1,59 @@
"""High-level interface to a opus.api.decoder functions"""
from opus.api import decoder, ctl
class Decoder(object):
def __init__(self, fs, channels):
"""
Parameters:
fs : sampling rate
channels : number of channels
"""
self._fs = fs
self._channels = channels
self._state = decoder.create(fs, channels)
def __del__(self):
if hasattr(self, '_state'):
# Destroying state only if __init__ completed successfully
decoder.destroy(self._state)
def reset_state(self):
"""Resets the codec state to be equivalent to a freshly initialized state"""
decoder.ctl(self._state, ctl.reset_state)
def decode(self, data, frame_size, decode_fec=False):
return decoder.decode(self._state, data, len(data), frame_size, decode_fec, channels=self._channels)
def decode_float(self, data, frame_size, decode_fec=False):
return decoder.decode_float(self._state, data, len(data), frame_size, decode_fec, channels=self._channels)
# CTL interfaces
_get_final_range = lambda self: decoder.ctl(self._state, ctl.get_final_range)
final_range = property(_get_final_range)
_get_bandwidth = lambda self: decoder.ctl(self._state, ctl.get_bandwidth)
bandwidth = property(_get_bandwidth)
_get_pitch = lambda self: decoder.ctl(self._state, ctl.get_pitch)
pitch = property(_get_pitch)
_get_lsb_depth = lambda self: decoder.ctl(self._state, ctl.get_lsb_depth)
_set_lsb_depth = lambda self, x: decoder.ctl(self._state, ctl.set_lsb_depth, x)
lsb_depth = property(_get_lsb_depth, _set_lsb_depth)
_get_gain = lambda self: decoder.ctl(self._state, ctl.get_gain)
_set_gain = lambda self, x: decoder.ctl(self._state, ctl.set_gain, x)
gain = property(_get_gain, _set_gain)

143
opus/encoder.py Normal file
View File

@ -0,0 +1,143 @@
"""High-level interface to a opus.api.encoder functions"""
from opus.api import encoder, ctl, constants
APPLICATION_TYPES_MAP = {
'voip': constants.APPLICATION_VOIP,
'audio': constants.APPLICATION_AUDIO,
'restricted_lowdelay': constants.APPLICATION_RESTRICTED_LOWDELAY,
}
class Encoder(object):
def __init__(self, fs, channels, application):
"""
Parameters:
fs : sampling rate
channels : number of channels
"""
if application in APPLICATION_TYPES_MAP.keys():
application = APPLICATION_TYPES_MAP[application]
elif application in APPLICATION_TYPES_MAP.values():
pass # Nothing to do here
else:
raise ValueError("`application` value must be in 'voip', 'audio' or 'restricted_lowdelay'")
self._fs = fs
self._channels = channels
self._application = application
self._state = encoder.create(fs, channels, application)
def __del__(self):
if hasattr(self, '_state'):
# Destroying state only if __init__ completed successfully
encoder.destroy(self._state)
def reset_state(self):
"""Resets the codec state to be equivalent to a freshly initialized state"""
encoder.ctl(self._state, ctl.reset_state)
def encode(self, data, frame_size):
return encoder.encode(self._state, data, frame_size, len(data))
def encode_float(self, data, frame_size, decode_fec=False):
return encoder.encode_float(self._state, data, frame_size, len(data))
# CTL interfaces
_get_final_range = lambda self: encoder.ctl(self._state, ctl.get_final_range)
final_range = property(_get_final_range)
_get_bandwidth = lambda self: encoder.ctl(self._state, ctl.get_bandwidth)
bandwidth = property(_get_bandwidth)
_get_pitch = lambda self: encoder.ctl(self._state, ctl.get_pitch)
pitch = property(_get_pitch)
_get_lsb_depth = lambda self: encoder.ctl(self._state, ctl.get_lsb_depth)
_set_lsb_depth = lambda self, x: encoder.ctl(self._state, ctl.set_lsb_depth, x)
lsb_depth = property(_get_lsb_depth, _set_lsb_depth)
_get_complexity = lambda self: encoder.ctl(self._state, ctl.get_complexity)
_set_complexity = lambda self, x: encoder.ctl(self._state, ctl.set_complexity, x)
complexity = property(_get_complexity, _set_complexity)
_get_bitrate = lambda self: encoder.ctl(self._state, ctl.get_bitrate)
_set_bitrate = lambda self, x: encoder.ctl(self._state, ctl.set_bitrate, x)
bitrate = property(_get_bitrate, _set_bitrate)
_get_vbr = lambda self: encoder.ctl(self._state, ctl.get_vbr)
_set_vbr = lambda self, x: encoder.ctl(self._state, ctl.set_vbr, x)
vbr = property(_get_vbr, _set_vbr)
_get_vbr_constraint = lambda self: encoder.ctl(self._state, ctl.get_vbr_constraint)
_set_vbr_constraint = lambda self, x: encoder.ctl(self._state, ctl.set_vbr_constraint, x)
vbr_constraint = property(_get_vbr_constraint, _set_vbr_constraint)
_get_force_channels = lambda self: encoder.ctl(self._state, ctl.get_force_channels)
_set_force_channels = lambda self, x: encoder.ctl(self._state, ctl.set_force_channels, x)
force_channels = property(_get_force_channels, _set_force_channels)
_get_max_bandwidth = lambda self: encoder.ctl(self._state, ctl.get_max_bandwidth)
_set_max_bandwidth = lambda self, x: encoder.ctl(self._state, ctl.set_max_bandwidth, x)
max_bandwidth = property(_get_max_bandwidth, _set_max_bandwidth)
_set_bandwidth = lambda self, x: encoder.ctl(self._state, ctl.set_bandwidth, x)
bandwidth = property(None, _set_bandwidth)
_get_signal = lambda self: encoder.ctl(self._state, ctl.get_signal)
_set_signal = lambda self, x: encoder.ctl(self._state, ctl.set_signal, x)
signal = property(_get_signal, _set_signal)
_get_application = lambda self: encoder.ctl(self._state, ctl.get_application)
_set_application = lambda self, x: encoder.ctl(self._state, ctl.set_application, x)
application = property(_get_application, _set_application)
_get_sample_rate = lambda self: encoder.ctl(self._state, ctl.get_sample_rate)
sample_rate = property(_get_sample_rate)
_get_lookahead = lambda self: encoder.ctl(self._state, ctl.get_lookahead)
lookahead = property(_get_lookahead)
_get_inband_fec = lambda self: encoder.ctl(self._state, ctl.get_inband_fec)
_set_inband_fec = lambda self, x: encoder.ctl(self._state, ctl.set_inband_fec)
inband_fec = property(_get_inband_fec, _set_inband_fec)
_get_packet_loss_perc = lambda self: encoder.ctl(self._state, ctl.get_packet_loss_perc)
_set_packet_loss_perc = lambda self, x: encoder.ctl(self._state, ctl.set_packet_loss_perc, x)
packet_loss_perc = property(_get_packet_loss_perc, _set_packet_loss_perc)
_get_dtx = lambda self: encoder.ctl(self._state, ctl.get_dtx)
_set_dtx = lambda self, x: encoder.ctl(self._state, ctl.get_dtx, x)

10
opus/exceptions.py Normal file
View File

@ -0,0 +1,10 @@
from opus.api.info import strerror
class OpusError(Exception):
def __init__(self, code):
self.code = code
def __str__(self):
return strerror(self.code)

775
www/controls.js vendored

File diff suppressed because one or more lines are too long

View File

@ -3,7 +3,6 @@
<meta charset="UTF-8">
<title>Universal Hamradio Remote by F4HTB</title>
<link rel="stylesheet" type="text/css" href="style.css">
<script src="controls.js"></script>
</head>
</head>
<body onload="bodyload();">
@ -16,6 +15,35 @@
<div id="div-princ">
<img onclick="powertogle();" id="button_power" src="img/poweroff.png">
<form id="RX-GAIN_control">
RX GAIN:<input oninput="AudioRX_SetGAIN(this.value/100);" onchange="AudioRX_SetGAIN(this.value/100);" value="50" step="5" type="range" id="C_af" name="volume" min="0" max="1000">
</form>
<div id="Rxmeters">
<div id="Rxinstant">
<div class="label">RX volume:</div>
<meter low="10" high="70" max="100" value="0"></meter>
<div class="value"></div>
</div>
</div>
<form id="TX-GAIN_control">
TX GAIN:<input oninput="AudioTX_SetGAIN(this.value/100);" onchange="AudioTX_SetGAIN(this.value/100);" value="50" step="5" type="range" id="C_af" name="volume" min="0" max="200">
</form>
<div id="Txmeters">
<div id="Txinstant">
<div class="label">TX volume:</div>
<meter low="10" high="50" max="100" value="0"></meter>
<div class="value"></div>
</div>
</div>
<span onmousedown="if(poweron){button_pressed();toggleRecord(true);toggleaudioRX();sendTRXptt(true);}" onmouseup="button_unpressed();if(poweron){toggleRecord();toggleaudioRX();sendTRXptt(false);}" class="button_unpressed" id="TX-record">TX</span>
<form id="record_opus">
<input type="checkbox" id="encode" alt="Encode TX with opus codec">Encode TX</input><br>
</form>
<div id="div-freq">
@ -60,8 +88,53 @@
</ul>
</div>
<form id="hambands">
<label>Ham Bands</label>
<select id="selecthambands">
<option value="001845500,LSB">160m in LSB</option>
<option value="003692500,LSB"> 80m in LSB</option>
<option value="007092500,LSB"> 40m in LSB</option>
<option value="010140000,USB"> 30m in USB</option>
<option value="014127500,USB"> 20m in USB</option>
<option value="018132500,USB"> 17m in USB</option>
<option value="021287500,USB"> 15m in USB</option>
<option value="024952500,USB"> 12m in USB</option>
<option value="028362500,USB"> 10m in USB</option>
<option value="050202500,USB"> 6m in USB</option>
<option value="070202500,USB"> 4m in USB</option>
<option value="144300000,USB"> 2m in USB</option>
<option value="432200000,USB">70cm in USB</option>
</select>
<label onclick="recall_hambands();">recall</label>
</form>
<form id="personalfrequency">
<label>Personal Freqs</label>
<select id="selectpersonalfrequency">
</select>
<label onclick="recall_freqfromcokkies();">recall</label>
<label onclick="delete_freqfromcokkies();">&nbsp;delete</label>
<label onclick="save_freqtocokkies();">&nbsp;save</label>
</form>
<canvas class="visualizer" id="canBFSPC" width="1024" height="300"></canvas>
<canvas class="visualizer" id="canBFFFT" width="2048" height="140"></canvas>
<div id="div-mode_menu">
<ul>
<li onclick="togle_li();sendTRXmode();" class="button_mode button_pressed" lichecked="">USB</li>
<li onclick="togle_li();sendTRXmode();" class="button_mode button_unpressed">LSB</li>
<li onclick="togle_li();sendTRXmode();" class="button_mode button_unpressed">CW</li>
<li onclick="togle_li();sendTRXmode();" class="button_mode button_unpressed">AM</li>
<li onclick="togle_li();sendTRXmode();" class="button_mode button_unpressed">FM</li>
</ul>
</div>
<canvas id="canRXsmeter" width=250 height=50 ></canvas>
<div id="div-smeterdigitRX">S9+40dB</div>
<div id="div-scoketscontrols">
<p id="indcwsTX"><img src="img/critsred.png">wsTX</p>
<p id="indwsAudioTX"><img src="img/critsred.png">wsTX</p>
<p id="indwsAudioRX"><img src="img/critsred.png">wsRX</p>
<p id="indwsControlTRX"><img src="img/critsred.png">wsCtrl</p>
<p id="indcwsFFT"><img src="img/critsred.png">wsFFT</p>
@ -69,5 +142,6 @@
<div id="div-latencymeter">latency:∞</div>
</div>
<script src="controls.js"></script>
</body>
</html>

View File

@ -7,6 +7,7 @@ body {
-moz-user-select: none;
-ms-user-select: none;
user-select: none;
/* overflow: hidden; */
}
#ombre-body{
@ -44,15 +45,10 @@ overflow: hidden;
#button_power
{
position:absolute;
left:100px;
top:100px;
width:100px;
height:100px;
}
.button_mode{
width:60px;
height: 40px;
left:105px;
top:105px;
width:150px;
height:150px;
}
.button_pressed{
@ -87,28 +83,28 @@ overflow: hidden;
{
position:absolute;
left: 400px;
top:100px;
top:80px;
height: 200px;
width:1000px;
width:700px;
}
#div-freq > ul > li {
display: inline-block;
font:normal bold 20px tahoma;
font:normal bold 45px tahoma;
text-align:center;
width:87px;
width:60px;
}
#freq_disp{
border-radius: 5px 5px 5px 5px ;
box-shadow:0px 2px 2px 0px rgba(0, 0, 0, 0.5) inset,0px 2px 2px 0px rgba(255, 255, 255, 0.5);
background-color: black;
width:1000px;
width:700px;
padding-top:5px;
padding-left:5px;
margin-top:10px;
margin-bottom:10px;
height: 30px;
height: 60px;
}
#freq_disp > ul > li {
@ -125,7 +121,7 @@ overflow: hidden;
}
#freq_but{
width:1000px;
width:700px;
margin:0;
padding-left:5px;
}
@ -150,15 +146,15 @@ overflow: hidden;
#div-scoketscontrols
{
position:absolute;
left: 1350px;
top:825px;
width: 280px;
left: 1100px;
top:800px;
width: 580px;
}
#div-scoketscontrols > p
{
display: inline-block;
font:normal bold 14px tahoma;
font:normal bold 25px tahoma;
text-align:center;
margin-right:10px;
}
@ -166,13 +162,240 @@ overflow: hidden;
#div-scoketscontrols > p > img
{
margin-right:5px;
width:10px;
height:10px;
width:25px;
height:25px;
}
#div-latencymeter
{
font:normal bold 25px tahoma;
position:absolute;
left:1680px;
top:840px;
}
left:1600px;
top:825px;
}
#RX-GAIN_control
{
width:200px;
height:140px;
position:absolute;
left:80px;
top:300px;
font:normal bold 25px tahoma;
text-align:center;
}
div#Txmeters > div {
text-align:center;
width:200px;
position:absolute;
left: 80px;
top:600px;
font:normal bold 25px tahoma;
}
div#Txmeters div.label {
display: inline-block;
}
div#Txmeters div.value {
display: inline-block;
}
div#Rxmeters > div {
text-align:center;
width:200px;
position:absolute;
left: 80px;
top:380px;
font:normal bold 25px tahoma;
}
div#Rxmeters div.label {
display: inline-block;
}
div#Rxmeters div.value {
display: inline-block;
}
#TX-GAIN_control
{
width:200px;
height:140px;
position:absolute;
left:80px;
top:530px;
font:normal bold 25px tahoma;
text-align:center;
}
#TX-record
{
text-align:center;
font:normal bold 110px tahoma;
width:200px;
height:140px;
position:absolute;
left:80px;
top:690px;
}
#record_opus
{
text-align:center;
width:200px;
height:140px;
position:absolute;
left:30px;
top:805px;
}
#canBFSPC{
position:absolute;
left:400px;
top:320px;
width:450px;
height:200px;
background:black;
background-color:black;
border-radius: 5px 5px 5px 5px;
box-shadow:0px 2px 2px 0px rgba(0, 0, 0, 0.5) inset,0px 2px 2px 0px rgba(255, 255, 255, 0.5);
}
#canBFFFT{
position:absolute;
left:950px;
top:320px;
width:450px;
height:200px;
background:black;
background-color:black;
border-radius: 5px 5px 5px 5px;
box-shadow:0px 2px 2px 0px rgba(0, 0, 0, 0.5) inset,0px 2px 2px 0px rgba(255, 255, 255, 0.5);
}
#div-mode_menu
{
position:absolute;
left: 400px;
top:550px;
width: 1000px;
}
#div-mode_menu > ul
{
padding:0;
width:1000px;
text-align:center;
}
#div-mode_menu > ul > li {
display: inline-block;
font: normal bold 45px tahoma;
text-align:center;
}
#div-mode_menu > ul > li:before{
content: '';
display: inline-block;
vertical-align: middle;
height: 100%;
}
.button_mode{
width:190px;
height: 60px;
}
#personalfrequency
{
position:absolute;
left: 1130px;
top:180px;
height: 80px;
width:240px;
font: normal bold 25px tahoma;
}
#personalfrequency > select
{
width:240px;
font: normal bold 25px tahoma;
}
#hambands
{
position:absolute;
left: 1130px;
top:80px;
height: 80px;
width:240px;
font: normal bold 25px tahoma;
}
#hambands > select
{
width:240px;
font: normal bold 25px tahoma;
}
#canRXsmeter{
position:absolute;
left:1450px;
top:100px;
width:250px;
height:50px;
background:black;
background-color:black;
border-radius: 5px 5px 5px 5px;
box-shadow:0px 2px 2px 0px rgba(0, 0, 0, 0.5) inset,0px 2px 2px 0px rgba(255, 255, 255, 0.5);
}
#div-smeterdigitRX
{
position:absolute;
left:1450px;
top:160px;
}
/* @group Blink */
.blink {
-webkit-animation: blink .75s linear infinite;
-moz-animation: blink .75s linear infinite;
-ms-animation: blink .75s linear infinite;
-o-animation: blink .75s linear infinite;
animation: blink .75s linear infinite;
}
@-webkit-keyframes blink {
0% { opacity: 1; }
50% { opacity: 1; }
50.01% { opacity: 0; }
100% { opacity: 0; }
}
@-moz-keyframes blink {
0% { opacity: 1; }
50% { opacity: 1; }
50.01% { opacity: 0; }
100% { opacity: 0; }
}
@-ms-keyframes blink {
0% { opacity: 1; }
50% { opacity: 1; }
50.01% { opacity: 0; }
100% { opacity: 0; }
}
@-o-keyframes blink {
0% { opacity: 1; }
50% { opacity: 1; }
50.01% { opacity: 0; }
100% { opacity: 0; }
}
@keyframes blink {
0% { opacity: 1; }
50% { opacity: 1; }
50.01% { opacity: 0; }
100% { opacity: 0; }
}
/* @end */