Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release v0.0.4? #9

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions afsk/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@

from afsk import encode
import ax25

from .ax25 import FCS
12 changes: 5 additions & 7 deletions afsk/afsk.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# coding=utf-8

# Bell 202 Audio Frequency Shift Keying
# http://n1vg.net/packet/

Expand All @@ -11,9 +9,9 @@

from bitarray import bitarray

import audiogen
from audiogen.util import multiply
from audiogen.util import constant
import audiogen_p3 as audiogen
from audiogen_p3.util import multiply
from audiogen_p3.util import constant

MARK_HZ = 1200.0
SPACE_HZ = 2200.0
Expand All @@ -30,7 +28,7 @@ def encode(binary_data):
audiogen module.
'''
framed_data = frame(binary_data)

# set volume to 1/2, preceed packet with 1/20 s silence to allow for startup glitches
for sample in itertools.chain(
audiogen.silence(1.05),
Expand All @@ -53,7 +51,7 @@ def modulate(data):
clock = (x / BAUD_RATE for x in itertools.count(1))
tones = (MARK_HZ if bit else SPACE_HZ for bit in data)

for boundary, frequency in itertools.izip(clock, tones):
for boundary, frequency in izip(clock, tones):
# frequency of current symbol is determined by how much
# we advance the signal's phase in each audio frame
phase_change_per_sample = TWO_PI / (audiogen.sampler.FRAME_RATE / frequency)
Expand Down
82 changes: 26 additions & 56 deletions afsk/ax25.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
import argparse

from bitarray import bitarray
import audiogen
import audiogen_p3 as audiogen

import afsk
from . import afsk

def bit_stuff(data):
count = 0
Expand Down Expand Up @@ -88,43 +88,31 @@ def fcs_validate(bits):
raise Exception("FCS checksum invalid.")

class AX25(object):
def __init__(
self,
destination=b"APRS",
source=b"",
digipeaters=(b"RELAY", b"WIDE2-1"),
info=b"\""
):
def __init__(self, destination="APRS", source="", digipeaters=("RELAY", "WIDE2-1"), info="\"" ):
self.flag = b"\x7e"

self.destination = destination
self.source = source
self.digipeaters = digipeaters

self.info = info

@classmethod
def callsign_encode(self, callsign):
callsign = callsign.upper()
if callsign.find(b"-") > 0:
callsign, ssid = callsign.split(b"-")
if callsign.find("-") > 0:
callsign, ssid = callsign.split("-")
else:
ssid = b"0"
ssid = "0"

assert(len(ssid) == 1)
assert(len(callsign) <= 6)

callsign = b"{callsign:6s}{ssid}".format(callsign=callsign, ssid=ssid)

callsign = "{callsign:6s}{ssid}".format(callsign=callsign, ssid=ssid)
# now shift left one bit, argh
return b"".join([chr(ord(char) << 1) for char in callsign])
return b"".join([bytes([char << 1]) for char in callsign.encode('utf-8')])

def encoded_addresses(self):
address_bytes = bytearray(b"{destination}{source}{digis}".format(
destination = AX25.callsign_encode(self.destination),
source = AX25.callsign_encode(self.source),
digis = b"".join([AX25.callsign_encode(digi) for digi in self.digipeaters])
))
address = b"".join([AX25.callsign_encode(self.destination),AX25.callsign_encode(self.source), b"".join([AX25.callsign_encode(digi) for digi in self.digipeaters])])
address_bytes = bytearray(address)

# set the low order (first, with eventual little bit endian encoding) bit
# in order to flag the end of the address string
Expand All @@ -133,36 +121,30 @@ def encoded_addresses(self):
return address_bytes

def header(self):
return b"{addresses}{control}{protocol}".format(
addresses = self.encoded_addresses(),
control = self.control_field, # * 8,
protocol = self.protocol_id,
)
return b"".join([self.encoded_addresses(), self.control_field, self.protocol_id])

def packet(self):
return b"{header}{info}{fcs}".format(
flag = self.flag,
header = self.header(),
info = self.info,
fcs = self.fcs()
)
return b"".join([self.flag,self.header(),self.info.encode('utf-8'),self.fcs()])

def unparse(self):
flag = bitarray(endian="little")
flag.frombytes(self.flag)

bits = bitarray(endian="little")
bits.frombytes("".join([self.header(), self.info, self.fcs()]))

bits.frombytes(b"".join([self.header(), self.info.encode('utf-8'), self.fcs()]))
return flag + bit_stuff(bits) + flag

def __repr__(self):
return self.__str__()

def __str__(self):
return b"{source}>{destination},{digis}:{info}".format(
__str__ = "{source}>{destination},{digis}:{info}".format(
destination = self.destination,
source = self.source,
digis = b",".join(self.digipeaters),
digis = ",".join(self.digipeaters),
info = self.info
)
return __str__

@classmethod
def parse(cls, bits):
Expand All @@ -177,7 +159,7 @@ def parse(cls, bits):

def fcs(self):
content = bitarray(endian="little")
content.frombytes("".join([self.header(), self.info]))
content.frombytes(b"".join([self.header(), self.info.encode('utf-8')]))

fcs = FCS()
for bit in content:
Expand All @@ -187,20 +169,8 @@ def fcs(self):
return fcs.digest()

class UI(AX25):
def __init__(
self,
destination=b"APRS",
source=b"",
digipeaters=(b"WIDE1-1", b"WIDE2-1"),
info=b""
):
AX25.__init__(
self,
destination,
source,
digipeaters,
info
)
def __init__(self, destination="APRS", source="", digipeaters=("WIDE1-1", "WIDE2-1"), info=""):
AX25.__init__(self, destination, source, digipeaters, info)
self.control_field = b"\x03"
self.protocol_id = b"\xf0"

Expand All @@ -219,14 +189,14 @@ def main(arguments=None):
)
parser.add_argument(
'--destination',
default=b'APRS',
default='APRS',
help='AX.25 destination address. See http://www.aprs.org/aprs11/tocalls.txt'
)
parser.add_argument(
'-d',
'--digipeaters',
'--path',
default=b'WIDE1-1,WIDE2-1',
default='WIDE1-1,WIDE2-1',
help='Digipeater path to use. "New Paradigm" recommendations are "WIDE1-1,WIDE2-1" for mobile and "WIDE2-1" for fixed stations. Defaults to "WIDE1-1,WIDE2-1."'
)
parser.add_argument(
Expand All @@ -245,14 +215,14 @@ def main(arguments=None):

if args.verbose == 0:
logging.basicConfig(level=logging.INFO)
elif args.verbose >=1:
else:
logging.basicConfig(level=logging.DEBUG)

packet = UI(
destination=args.destination,
source=args.callsign,
info=args.info,
digipeaters=args.digipeaters.split(b','),
digipeaters=args.digipeaters.split(','),
)

logger.info(r"Sending packet: '{0}'".format(packet))
Expand Down
8 changes: 4 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

required_modules = [
'argparse',
'audiogen',
'audiogen_p3',
'bitarray',
]
extras_require = {
Expand All @@ -17,7 +17,7 @@

setup(
name="afsk",
version="0.0.3",
version="0.0.4",
description=u"Bell 202 Audio Frequency Shift Keying encoder and APRS packet audio tools",
author="Christopher H. Casebeer",
author_email="",
Expand All @@ -40,8 +40,8 @@
classifiers=[
"Environment :: Console",
"License :: OSI Approved :: BSD License",
"Programming Language :: Python :: 2.6",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.8",
"Intended Audience :: Developers",
"Intended Audience :: End Users/Desktop",
"Topic :: Multimedia :: Sound/Audio",
Expand Down
22 changes: 11 additions & 11 deletions test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
logging.basicConfig(level=logging.DEBUG)

import afsk
from afsk.ax25 import FCS
from bitarray import bitarray

import crc16
Expand All @@ -15,29 +15,29 @@


def test_packet():
packet = afsk.ax25.UI("APRS", "DUMMY", info=":Test")
packet = ax25.UI("APRS", "DUMMY", info=":Test")
generated_bits = packet.unparse()
generated_bytes = generated_bits.tobytes()
expected_bytes = '~\x82\xa0\xa4\xa6@@`\x88\xaa\x9a\x9a\xb2@`\xae\x92\x88\x8ab@b\xae\x92\x88\x8ad@c\x03\xf0:Test\x9f/\xfb\x01'
#expected_bits = bitarray.bitarray('01111110010000010000010100100101011001010000001000000010000001100001000101010101010110010101100101001101000000100000011001110101010010010001000101010001010001100000001001000110011101010100100100010001010100010010011000000010110001101100000000001111010111000010101010100110110011100010111011111001111101001101111110')

print "Unstuffed body BA:\n%s" % unstuffed_body
print ("Unstuffed body BA:\n%s" % unstuffed_body)

print "checksummed_content_bits:\n%r" % "".join([packet.header(), packet.info])
print ("checksummed_content_bits:\n%r %r" % (packet.header(), packet.info))

print "BS PACKET:\n%r" % (bs_packet)
print "Packet:\n%s\n%r\nHeader:\n%r" % (packet, packet.packet(), packet.header())
print "BS HEADER:\n%r" % (bs_header)
print "Generated:\n%r\nExpected:\n%r" % (generated_bytes, expected_bytes)
print ("BS PACKET:\n%r" % (bs_packet))
print ("Packet:\n%s\n%r\nHeader:\n%r" % (packet, packet.packet(), packet.header()))
print ("BS HEADER:\n%r" % (bs_header))
print ("Generated:\n%r\nExpected:\n%r" % (generated_bytes, expected_bytes))

#assert generated_bits == expected_bits
assert generated_bytes == expected_bytes

def test_fcs():
fcs = afsk.ax25.FCS()
bytes = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
fcs = FCS()
str_bytes = b'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
bits = bitarray()
bits.frombytes(bytes)
bits.frombytes(str_bytes)

for bit in bits:
fcs.update_bit(bit)
Expand Down