wireless-regdb: move patch

This commit is contained in:
coolsnowwolf 2020-02-21 21:25:39 +08:00
parent 8cb3f52340
commit 37ba200723
6 changed files with 616 additions and 958 deletions

View File

@ -20,7 +20,6 @@ define Package/wireless-regdb
endef
define Build/Compile
$(CP) ./db2bin.py ./db2fw.py ./dbparse.py $(PKG_BUILD_DIR)/
$(STAGING_DIR_HOST)/bin/$(PYTHON) $(PKG_BUILD_DIR)/db2fw.py $(PKG_BUILD_DIR)/regulatory.db $(PKG_BUILD_DIR)/db.txt
endef

View File

@ -1,147 +0,0 @@
#!/usr/bin/env python
from io import BytesIO, open
import struct
import hashlib
from dbparse import DBParser
import sys
MAGIC = 0x52474442
VERSION = 19
if len(sys.argv) < 3:
print('Usage: %s output-file input-file [key-file]' % sys.argv[0])
sys.exit(2)
def create_rules(countries):
result = {}
for c in countries.values():
for rule in c.permissions:
result[rule] = 1
return list(result)
def create_collections(countries):
result = {}
for c in countries.values():
result[c.permissions] = 1
return list(result)
def be32(output, val):
output.write(struct.pack('>I', val))
class PTR(object):
def __init__(self, output):
self._output = output
self._pos = output.tell()
be32(output, 0xFFFFFFFF)
def set(self, val=None):
if val is None:
val = self._output.tell()
self._offset = val
pos = self._output.tell()
self._output.seek(self._pos)
be32(self._output, val)
self._output.seek(pos)
def get(self):
return self._offset
p = DBParser()
countries = p.parse(open(sys.argv[2], 'r', encoding='utf-8'))
countrynames = list(countries)
countrynames.sort()
power = []
bands = []
for alpha2 in countrynames:
for perm in countries[alpha2].permissions:
if not perm.freqband in bands:
bands.append(perm.freqband)
if not perm.power in power:
power.append(perm.power)
rules = create_rules(countries)
rules.sort()
collections = create_collections(countries)
collections.sort()
output = BytesIO()
# struct regdb_file_header
be32(output, MAGIC)
be32(output, VERSION)
reg_country_ptr = PTR(output)
# add number of countries
be32(output, len(countries))
siglen = PTR(output)
power_rules = {}
for pr in power:
power_rules[pr] = output.tell()
pr = [int(v * 100.0) for v in (pr.max_ant_gain, pr.max_eirp)]
# struct regdb_file_power_rule
output.write(struct.pack('>II', *pr))
freq_ranges = {}
for fr in bands:
freq_ranges[fr] = output.tell()
fr = [int(f * 1000.0) for f in (fr.start, fr.end, fr.maxbw)]
# struct regdb_file_freq_range
output.write(struct.pack('>III', *fr))
reg_rules = {}
for reg_rule in rules:
freq_range, power_rule = reg_rule.freqband, reg_rule.power
reg_rules[reg_rule] = output.tell()
# struct regdb_file_reg_rule
output.write(struct.pack('>III', freq_ranges[freq_range], power_rules[power_rule],
reg_rule.flags))
reg_rules_collections = {}
for coll in collections:
reg_rules_collections[coll] = output.tell()
# struct regdb_file_reg_rules_collection
coll = list(coll)
be32(output, len(coll))
coll.sort()
for regrule in coll:
be32(output, reg_rules[regrule])
# update country pointer now!
reg_country_ptr.set()
for alpha2 in countrynames:
coll = countries[alpha2]
# struct regdb_file_reg_country
output.write(struct.pack('>2sxBI', alpha2, coll.dfs_region, reg_rules_collections[coll.permissions]))
if len(sys.argv) > 3:
# Load RSA only now so people can use this script
# without having those libraries installed to verify
# their SQL changes
from M2Crypto import RSA
# determine signature length
key = RSA.load_key(sys.argv[3])
hash = hashlib.sha1()
hash.update(output.getvalue())
sig = key.sign(hash.digest())
# write it to file
siglen.set(len(sig))
# sign again
hash = hashlib.sha1()
hash.update(output.getvalue())
sig = key.sign(hash.digest())
output.write(sig)
else:
siglen.set(0)
outfile = open(sys.argv[1], 'wb')
outfile.write(output.getvalue())

View File

@ -1,158 +0,0 @@
#!/usr/bin/env python
from io import BytesIO, open
import struct
import hashlib
from dbparse import DBParser
import sys
from math import log
MAGIC = 0x52474442
VERSION = 20
if len(sys.argv) < 3:
print('Usage: %s output-file input-file' % sys.argv[0])
sys.exit(2)
def create_rules(countries):
result = {}
for c in countries.values():
for rule in c.permissions:
result[rule] = 1
return list(result)
def create_collections(countries):
result = {}
for c in countries.values():
result[(c.permissions, c.dfs_region)] = 1
return list(result)
def create_wmms(countries):
result = {}
for c in countries.values():
for rule in c.permissions:
if rule.wmmrule is not None:
result[rule.wmmrule] = 1
return list(result)
def be32(output, val):
output.write(struct.pack('>I', val))
def be16(output, val):
output.write(struct.pack('>H', val))
class PTR(object):
def __init__(self, output):
self._output = output
self._pos = output.tell()
be16(output, 0)
self._written = False
def set(self, val=None):
if val is None:
val = self._output.tell()
assert val & 3 == 0
self._offset = val
pos = self._output.tell()
self._output.seek(self._pos)
be16(self._output, val >> 2)
self._output.seek(pos)
self._written = True
def get(self):
return self._offset
@property
def written(self):
return self._written
p = DBParser()
countries = p.parse(open(sys.argv[2], 'r', encoding='utf-8'))
rules = create_rules(countries)
rules.sort()
collections = create_collections(countries)
collections.sort()
wmms = create_wmms(countries)
wmms.sort()
output = BytesIO()
# struct regdb_file_header
be32(output, MAGIC)
be32(output, VERSION)
country_ptrs = {}
countrynames = list(countries)
countrynames.sort()
for alpha2 in countrynames:
coll = countries[alpha2]
output.write(struct.pack('>2s', alpha2))
country_ptrs[alpha2] = PTR(output)
output.write(b'\x00' * 4)
wmmdb = {}
for w in wmms:
assert output.tell() & 3 == 0
wmmdb[w] = output.tell() >> 2
for r in w._as_tuple():
ecw = int(log(r[0] + 1, 2)) << 4 | int(log(r[1] + 1, 2))
ac = (ecw, r[2],r[3])
output.write(struct.pack('>BBH', *ac))
reg_rules = {}
flags = 0
for reg_rule in rules:
freq_range, power_rule, wmm_rule = reg_rule.freqband, reg_rule.power, reg_rule.wmmrule
reg_rules[reg_rule] = output.tell()
assert power_rule.max_ant_gain == 0
flags = 0
# convert to new rule flags
assert reg_rule.flags & ~0x899 == 0
if reg_rule.flags & 1<<0:
flags |= 1<<0
if reg_rule.flags & 1<<3:
flags |= 1<<1
if reg_rule.flags & 1<<4:
flags |= 1<<2
if reg_rule.flags & 1<<7:
flags |= 1<<3
if reg_rule.flags & 1<<11:
flags |= 1<<4
rule_len = 16
cac_timeout = 0 # TODO
if not (flags & 1<<2):
cac_timeout = 0
if cac_timeout or wmm_rule:
rule_len += 2
if wmm_rule is not None:
rule_len += 2
output.write(struct.pack('>BBHIII', rule_len, flags, int(power_rule.max_eirp * 100),
int(freq_range.start * 1000), int(freq_range.end * 1000), int(freq_range.maxbw * 1000),
))
if rule_len > 16:
output.write(struct.pack('>H', cac_timeout))
if rule_len > 18:
be16(output, wmmdb[wmm_rule])
while rule_len % 4:
output.write('\0')
rule_len += 1
for coll in collections:
for alpha2 in countrynames:
if (countries[alpha2].permissions, countries[alpha2].dfs_region) == coll:
assert not country_ptrs[alpha2].written
country_ptrs[alpha2].set()
slen = 3
output.write(struct.pack('>BBBx', slen, len(list(coll[0])), coll[1]))
coll = list(coll[0])
for regrule in coll:
be16(output, reg_rules[regrule] >> 2)
if len(coll) % 2:
be16(output, 0)
for alpha2 in countrynames:
assert country_ptrs[alpha2].written
outfile = open(sys.argv[1], 'wb')
outfile.write(output.getvalue())

View File

@ -1,515 +0,0 @@
#!/usr/bin/env python
from functools import total_ordering
import sys, math
from math import ceil, log
from collections import defaultdict, OrderedDict
# must match <linux/nl80211.h> enum nl80211_reg_rule_flags
flag_definitions = {
'NO-OFDM': 1<<0,
'NO-CCK': 1<<1,
'NO-INDOOR': 1<<2,
'NO-OUTDOOR': 1<<3,
'DFS': 1<<4,
'PTP-ONLY': 1<<5,
'PTMP-ONLY': 1<<6,
'NO-IR': 1<<7,
# hole at bit 8
# hole at bit 9. FIXME: Where is NO-HT40 defined?
'NO-HT40': 1<<10,
'AUTO-BW': 1<<11,
}
dfs_regions = {
'DFS-FCC': 1,
'DFS-ETSI': 2,
'DFS-JP': 3,
}
@total_ordering
class WmmRule(object):
def __init__(self, vo_c, vi_c, be_c, bk_c, vo_ap, vi_ap, be_ap, bk_ap):
self.vo_c = vo_c
self.vi_c = vi_c
self.be_c = be_c
self.bk_c = bk_c
self.vo_ap = vo_ap
self.vi_ap = vi_ap
self.be_ap = be_ap
self.bk_ap = bk_ap
def _as_tuple(self):
return (self.vo_c, self.vi_c, self.be_c, self.bk_c,
self.vo_ap, self.vi_ap, self.be_ap, self.bk_ap)
def __eq__(self, other):
if other is None:
return False
return (self._as_tuple() == other._as_tuple())
def __ne__(self, other):
return not (self == other)
def __lt__(self, other):
if other is None:
return False
return (self._as_tuple() < other._as_tuple())
def __hash__(self):
return hash(self._as_tuple())
class FreqBand(object):
def __init__(self, start, end, bw, comments=None):
self.start = start
self.end = end
self.maxbw = bw
self.comments = comments or []
def _as_tuple(self):
return (self.start, self.end, self.maxbw)
def __eq__(self, other):
return (self._as_tuple() == other._as_tuple())
def __ne__(self, other):
return not (self == other)
def __lt__(self, other):
return (self._as_tuple() < other._as_tuple())
def __hash__(self):
return hash(self._as_tuple())
def __str__(self):
return '<FreqBand %.3f - %.3f @ %.3f>' % (
self.start, self.end, self.maxbw)
@total_ordering
class PowerRestriction(object):
def __init__(self, max_ant_gain, max_eirp, comments = None):
self.max_ant_gain = max_ant_gain
self.max_eirp = max_eirp
self.comments = comments or []
def _as_tuple(self):
return (self.max_ant_gain, self.max_eirp)
def __eq__(self, other):
return (self._as_tuple() == other._as_tuple())
def __ne__(self, other):
return not (self == other)
def __lt__(self, other):
return (self._as_tuple() < other._as_tuple())
def __hash__(self):
return hash(self._as_tuple())
def __str__(self):
return '<PowerRestriction ...>'
class DFSRegionError(Exception):
def __init__(self, dfs_region):
self.dfs_region = dfs_region
class FlagError(Exception):
def __init__(self, flag):
self.flag = flag
@total_ordering
class Permission(object):
def __init__(self, freqband, power, flags, wmmrule):
assert isinstance(freqband, FreqBand)
assert isinstance(power, PowerRestriction)
assert isinstance(wmmrule, WmmRule) or wmmrule is None
self.freqband = freqband
self.power = power
self.wmmrule = wmmrule
self.flags = 0
for flag in flags:
if not flag in flag_definitions:
raise FlagError(flag)
self.flags |= flag_definitions[flag]
self.textflags = flags
def _as_tuple(self):
return (self.freqband, self.power, self.flags, self.wmmrule)
def __eq__(self, other):
return (self._as_tuple() == other._as_tuple())
def __ne__(self, other):
return not (self == other)
def __lt__(self, other):
return (self._as_tuple() < other._as_tuple())
def __hash__(self):
return hash(self._as_tuple())
def __str__(self):
return str(self.freqband) + str(self.power) + str(self.wmmrule)
class Country(object):
def __init__(self, dfs_region, permissions=None, comments=None):
self._permissions = permissions or []
self.comments = comments or []
self.dfs_region = 0
if dfs_region:
if not dfs_region in dfs_regions:
raise DFSRegionError(dfs_region)
self.dfs_region = dfs_regions[dfs_region]
def add(self, perm):
assert isinstance(perm, Permission)
self._permissions.append(perm)
self._permissions.sort()
def __contains__(self, perm):
assert isinstance(perm, Permission)
return perm in self._permissions
def __str__(self):
r = ['(%s, %s)' % (str(b), str(p)) for b, p in self._permissions]
return '<Country (%s)>' % (', '.join(r))
def _get_permissions_tuple(self):
return tuple(self._permissions)
permissions = property(_get_permissions_tuple)
class SyntaxError(Exception):
pass
class DBParser(object):
def __init__(self, warn=None):
self._warn_callout = warn or sys.stderr.write
def _syntax_error(self, txt=None):
txt = txt and ' (%s)' % txt or ''
raise SyntaxError("Syntax error in line %d%s" % (self._lineno, txt))
def _warn(self, txt):
self._warn_callout("Warning (line %d): %s\n" % (self._lineno, txt))
def _parse_band_def(self, bname, banddef, dupwarn=True):
try:
freqs, bw = banddef.split('@')
bw = float(bw)
except ValueError:
bw = 20.0
try:
start, end = freqs.split('-')
start = float(start)
end = float(end)
# The kernel will reject these, so might as well reject this
# upon building it.
if start <= 0:
self._syntax_error("Invalid start freq (%d)" % start)
if end <= 0:
self._syntax_error("Invalid end freq (%d)" % end)
if start > end:
self._syntax_error("Inverted freq range (%d - %d)" % (start, end))
if start == end:
self._syntax_error("Start and end freqs are equal (%d)" % start)
except ValueError:
self._syntax_error("band must have frequency range")
b = FreqBand(start, end, bw, comments=self._comments)
self._comments = []
self._banddup[bname] = bname
if b in self._bandrev:
if dupwarn:
self._warn('Duplicate band definition ("%s" and "%s")' % (
bname, self._bandrev[b]))
self._banddup[bname] = self._bandrev[b]
self._bands[bname] = b
self._bandrev[b] = bname
self._bandline[bname] = self._lineno
def _parse_band(self, line):
try:
bname, line = line.split(':', 1)
if not bname:
self._syntax_error("'band' keyword must be followed by name")
except ValueError:
self._syntax_error("band name must be followed by colon")
if bname in flag_definitions:
self._syntax_error("Invalid band name")
self._parse_band_def(bname, line)
def _parse_power(self, line):
try:
pname, line = line.split(':', 1)
if not pname:
self._syntax_error("'power' keyword must be followed by name")
except ValueError:
self._syntax_error("power name must be followed by colon")
if pname in flag_definitions:
self._syntax_error("Invalid power name")
self._parse_power_def(pname, line)
def _parse_power_def(self, pname, line, dupwarn=True):
try:
max_eirp = line
if max_eirp == 'N/A':
max_eirp = '0'
max_ant_gain = float(0)
def conv_pwr(pwr):
if pwr.endswith('mW'):
pwr = float(pwr[:-2])
return 10.0 * math.log10(pwr)
else:
return float(pwr)
max_eirp = conv_pwr(max_eirp)
except ValueError:
self._syntax_error("invalid power data")
p = PowerRestriction(max_ant_gain, max_eirp,
comments=self._comments)
self._comments = []
self._powerdup[pname] = pname
if p in self._powerrev:
if dupwarn:
self._warn('Duplicate power definition ("%s" and "%s")' % (
pname, self._powerrev[p]))
self._powerdup[pname] = self._powerrev[p]
self._power[pname] = p
self._powerrev[p] = pname
self._powerline[pname] = self._lineno
def _parse_wmmrule(self, line):
regions = line[:-1].strip()
if not regions:
self._syntax_error("'wmmrule' keyword must be followed by region")
regions = regions.split(',')
self._current_regions = {}
for region in regions:
if region in self._wmm_rules:
self._warn("region %s was added already to wmm rules" % region)
self._current_regions[region] = 1
self._comments = []
def _validate_input(self, cw_min, cw_max, aifsn, cot):
if cw_min < 1:
self._syntax_error("Invalid cw_min value (%d)" % cw_min)
if cw_max < 1:
self._syntax_error("Invalid cw_max value (%d)" % cw_max)
if cw_min > cw_max:
self._syntax_error("Inverted contention window (%d - %d)" %
(cw_min, cw_max))
if not (bin(cw_min + 1).count('1') == 1 and cw_min < 2**15):
self._syntax_error("Invalid cw_min value should be power of 2 - 1 (%d)"
% cw_min)
if not (bin(cw_max + 1).count('1') == 1 and cw_max < 2**15):
self._syntax_error("Invalid cw_max value should be power of 2 - 1 (%d)"
% cw_max)
if aifsn < 1:
self._syntax_error("Invalid aifsn value (%d)" % aifsn)
if cot < 0:
self._syntax_error("Invalid cot value (%d)" % cot)
def _validate_size(self, var, bytcnt):
return bytcnt < ceil(len(bin(var)[2:]) / 8.0)
def _parse_wmmrule_item(self, line):
bytcnt = (2.0, 2.0, 1.0, 2.0)
try:
ac, cval = line.split(':')
if not ac:
self._syntax_error("wmm item must have ac prefix")
except ValueError:
self._syntax_error("access category must be followed by colon")
p = tuple([int(v.split('=', 1)[1]) for v in cval.split(',')])
self._validate_input(*p)
for v, b in zip(p, bytcnt):
if self._validate_size(v, b):
self._syntax_error("unexpected input size expect %d got %d"
% (b, v))
for r in self._current_regions:
self._wmm_rules[r][ac] = p
def _parse_country(self, line):
try:
cname, cvals= line.split(':', 1)
dfs_region = cvals.strip()
if not cname:
self._syntax_error("'country' keyword must be followed by name")
except ValueError:
self._syntax_error("country name must be followed by colon")
cnames = cname.split(',')
self._current_countries = {}
for cname in cnames:
if len(cname) != 2:
self._warn("country '%s' not alpha2" % cname)
cname = cname.encode('ascii')
if not cname in self._countries:
self._countries[cname] = Country(dfs_region, comments=self._comments)
self._current_countries[cname] = self._countries[cname]
self._comments = []
def _parse_country_item(self, line):
if line[0] == '(':
try:
band, line = line[1:].split('),', 1)
bname = 'UNNAMED %d' % self._lineno
self._parse_band_def(bname, band, dupwarn=False)
except:
self._syntax_error("Badly parenthesised band definition")
else:
try:
bname, line = line.split(',', 1)
if not bname:
self._syntax_error("country definition must have band")
if not line:
self._syntax_error("country definition must have power")
except ValueError:
self._syntax_error("country definition must have band and power")
if line[0] == '(':
items = line.split('),', 1)
if len(items) == 1:
pname = items[0]
line = ''
if not pname[-1] == ')':
self._syntax_error("Badly parenthesised power definition")
pname = pname[:-1]
flags = []
else:
pname = items[0]
flags = items[1].split(',')
power = pname[1:]
pname = 'UNNAMED %d' % self._lineno
self._parse_power_def(pname, power, dupwarn=False)
else:
line = line.split(',')
pname = line[0]
flags = line[1:]
w = None
if flags and 'wmmrule' in flags[-1]:
try:
region = flags.pop().split('=', 1)[1]
if region not in self._wmm_rules.keys():
self._syntax_error("No wmm rule for %s" % region)
except IndexError:
self._syntax_error("flags is empty list or no region was found")
w = WmmRule(*self._wmm_rules[region].values())
if not bname in self._bands:
self._syntax_error("band does not exist")
if not pname in self._power:
self._syntax_error("power does not exist")
self._bands_used[bname] = True
self._power_used[pname] = True
# de-duplicate so binary database is more compact
bname = self._banddup[bname]
pname = self._powerdup[pname]
b = self._bands[bname]
p = self._power[pname]
try:
perm = Permission(b, p, flags, w)
except FlagError as e:
self._syntax_error("Invalid flag '%s'" % e.flag)
for cname, c in self._current_countries.items():
if perm in c:
self._warn('Rule "%s, %s" added to "%s" twice' % (
bname, pname, cname))
else:
c.add(perm)
def parse(self, f):
self._current_countries = None
self._current_regions = None
self._bands = {}
self._power = {}
self._countries = {}
self._bands_used = {}
self._power_used = {}
self._bandrev = {}
self._powerrev = {}
self._banddup = {}
self._powerdup = {}
self._bandline = {}
self._powerline = {}
self._wmm_rules = defaultdict(lambda: OrderedDict())
self._comments = []
self._lineno = 0
for line in f:
self._lineno += 1
line = line.strip()
if line[0:1] == '#':
self._comments.append(line[1:].strip())
line = line.replace(' ', '').replace('\t', '')
if not line:
self._current_regions = None
self._comments = []
line = line.split('#')[0]
if not line:
continue
if line[0:4] == 'band':
self._parse_band(line[4:])
self._current_countries = None
self._current_regions = None
self._comments = []
elif line[0:5] == 'power':
self._parse_power(line[5:])
self._current_countries = None
self._current_regions = None
self._comments = []
elif line[0:7] == 'country':
self._parse_country(line[7:])
self._comments = []
self._current_regions = None
elif self._current_countries is not None:
self._current_regions = None
self._parse_country_item(line)
self._comments = []
elif line[0:7] == 'wmmrule':
self._parse_wmmrule(line[7:])
self._current_countries = None
self._comments = []
elif self._current_regions is not None:
self._parse_wmmrule_item(line)
self._current_countries = None
self._comments = []
else:
self._syntax_error("Expected band, power or country definition")
countries = self._countries
bands = {}
for k, v in self._bands.items():
if k in self._bands_used:
bands[self._banddup[k]] = v
continue
# we de-duplicated, but don't warn again about the dupes
if self._banddup[k] == k:
self._lineno = self._bandline[k]
self._warn('Unused band definition "%s"' % k)
power = {}
for k, v in self._power.items():
if k in self._power_used:
power[self._powerdup[k]] = v
continue
# we de-duplicated, but don't warn again about the dupes
if self._powerdup[k] == k:
self._lineno = self._powerline[k]
self._warn('Unused power definition "%s"' % k)
return countries

View File

@ -112,140 +112,4 @@ Signed-off-by: Johannes Berg <johannes.berg@intel.com>
+that compile the database to its binary formats.
For more information, please see the CRDA git repository:
--- /dev/null
+++ b/db2fw.py
@@ -0,0 +1,133 @@
+#!/usr/bin/env python
+
+from cStringIO import StringIO
+import struct
+import hashlib
+from dbparse import DBParser
+import sys
+
+MAGIC = 0x52474442
+VERSION = 20
+
+if len(sys.argv) < 3:
+ print 'Usage: %s output-file input-file' % sys.argv[0]
+ sys.exit(2)
+
+def create_rules(countries):
+ result = {}
+ for c in countries.itervalues():
+ for rule in c.permissions:
+ result[rule] = 1
+ return result.keys()
+
+def create_collections(countries):
+ result = {}
+ for c in countries.itervalues():
+ result[(c.permissions, c.dfs_region)] = 1
+ return result.keys()
+
+
+def be32(output, val):
+ output.write(struct.pack('>I', val))
+def be16(output, val):
+ output.write(struct.pack('>H', val))
+
+class PTR(object):
+ def __init__(self, output):
+ self._output = output
+ self._pos = output.tell()
+ be16(output, 0)
+ self._written = False
+
+ def set(self, val=None):
+ if val is None:
+ val = self._output.tell()
+ assert val & 3 == 0
+ self._offset = val
+ pos = self._output.tell()
+ self._output.seek(self._pos)
+ be16(self._output, val >> 2)
+ self._output.seek(pos)
+ self._written = True
+
+ def get(self):
+ return self._offset
+
+ @property
+ def written(self):
+ return self._written
+
+p = DBParser()
+countries = p.parse(file(sys.argv[2]))
+rules = create_rules(countries)
+rules.sort(cmp=lambda x, y: cmp(x.freqband, y.freqband))
+collections = create_collections(countries)
+collections.sort(cmp=lambda x, y: cmp(x[0][0].freqband, y[0][0].freqband))
+
+output = StringIO()
+
+# struct regdb_file_header
+be32(output, MAGIC)
+be32(output, VERSION)
+
+country_ptrs = {}
+countrynames = countries.keys()
+countrynames.sort()
+for alpha2 in countrynames:
+ coll = countries[alpha2]
+ output.write(struct.pack('>cc', str(alpha2[0]), str(alpha2[1])))
+ country_ptrs[alpha2] = PTR(output)
+output.write('\x00' * 4)
+
+reg_rules = {}
+flags = 0
+for reg_rule in rules:
+ freq_range, power_rule = reg_rule.freqband, reg_rule.power
+ reg_rules[reg_rule] = output.tell()
+ assert power_rule.max_ant_gain == 0
+ flags = 0
+ # convert to new rule flags
+ assert reg_rule.flags & ~0x899 == 0
+ if reg_rule.flags & 1<<0:
+ flags |= 1<<0
+ if reg_rule.flags & 1<<3:
+ flags |= 1<<1
+ if reg_rule.flags & 1<<4:
+ flags |= 1<<2
+ if reg_rule.flags & 1<<7:
+ flags |= 1<<3
+ if reg_rule.flags & 1<<11:
+ flags |= 1<<4
+ rule_len = 16
+ cac_timeout = 0 # TODO
+ if not (flags & 1<<2):
+ cac_timeout = 0
+ if cac_timeout:
+ rule_len += 2
+ output.write(struct.pack('>BBHIII', rule_len, flags, power_rule.max_eirp * 100,
+ freq_range.start * 1000, freq_range.end * 1000, freq_range.maxbw * 1000,
+ ))
+ if cac_timeout:
+ output.write(struct.pack('>H', cac_timeout))
+ while rule_len % 4:
+ output.write('\0')
+ rule_len += 1
+
+for coll in collections:
+ for alpha2 in countrynames:
+ if (countries[alpha2].permissions, countries[alpha2].dfs_region) == coll:
+ assert not country_ptrs[alpha2].written
+ country_ptrs[alpha2].set()
+ slen = 3
+ output.write(struct.pack('>BBBx', slen, len(list(coll[0])), coll[1]))
+ coll = list(coll[0])
+ for regrule in coll:
+ be16(output, reg_rules[regrule] >> 2)
+ if len(coll) % 2:
+ be16(output, 0)
+
+for alpha2 in countrynames:
+ assert country_ptrs[alpha2].written
+
+outfile = open(sys.argv[1], 'w')
+outfile.write(output.getvalue())

View File

@ -0,0 +1,615 @@
diff --git a/db2bin.py b/db2bin.py
--- a/db2bin.py
+++ b/db2bin.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
-from cStringIO import StringIO
+from io import BytesIO, open
import struct
import hashlib
from dbparse import DBParser
@@ -10,21 +10,21 @@
VERSION = 19
if len(sys.argv) < 3:
- print 'Usage: %s output-file input-file [key-file]' % sys.argv[0]
+ print('Usage: %s output-file input-file [key-file]' % sys.argv[0])
sys.exit(2)
def create_rules(countries):
result = {}
- for c in countries.itervalues():
+ for c in countries.values():
for rule in c.permissions:
result[rule] = 1
- return result.keys()
+ return list(result)
def create_collections(countries):
result = {}
- for c in countries.itervalues():
+ for c in countries.values():
result[c.permissions] = 1
- return result.keys()
+ return list(result)
def be32(output, val):
@@ -49,21 +49,25 @@
return self._offset
p = DBParser()
-countries = p.parse(file(sys.argv[2]))
+countries = p.parse(open(sys.argv[2], 'r', encoding='utf-8'))
+
+countrynames = list(countries)
+countrynames.sort()
+
power = []
bands = []
-for c in countries.itervalues():
- for perm in c.permissions:
+for alpha2 in countrynames:
+ for perm in countries[alpha2].permissions:
if not perm.freqband in bands:
bands.append(perm.freqband)
if not perm.power in power:
power.append(perm.power)
rules = create_rules(countries)
-rules.sort(cmp=lambda x, y: cmp(x.freqband, y.freqband))
+rules.sort()
collections = create_collections(countries)
-collections.sort(cmp=lambda x, y: cmp(x[0].freqband, y[0].freqband))
+collections.sort()
-output = StringIO()
+output = BytesIO()
# struct regdb_file_header
be32(output, MAGIC)
@@ -104,19 +108,17 @@
# struct regdb_file_reg_rules_collection
coll = list(coll)
be32(output, len(coll))
- coll.sort(cmp=lambda x, y: cmp(x.freqband, y.freqband))
+ coll.sort()
for regrule in coll:
be32(output, reg_rules[regrule])
# update country pointer now!
reg_country_ptr.set()
-countrynames = countries.keys()
-countrynames.sort()
for alpha2 in countrynames:
coll = countries[alpha2]
# struct regdb_file_reg_country
- output.write(struct.pack('>ccxBI', str(alpha2[0]), str(alpha2[1]), coll.dfs_region, reg_rules_collections[coll.permissions]))
+ output.write(struct.pack('>2sxBI', alpha2, coll.dfs_region, reg_rules_collections[coll.permissions]))
if len(sys.argv) > 3:
@@ -141,5 +143,5 @@
else:
siglen.set(0)
-outfile = open(sys.argv[1], 'w')
+outfile = open(sys.argv[1], 'wb')
outfile.write(output.getvalue())
diff --git a/dbparse.py b/dbparse.py
--- a/dbparse.py
+++ b/dbparse.py
@@ -1,6 +1,9 @@
#!/usr/bin/env python
+from functools import total_ordering
import sys, math
+from math import ceil, log
+from collections import defaultdict, OrderedDict
# must match <linux/nl80211.h> enum nl80211_reg_rule_flags
@@ -25,6 +28,40 @@
'DFS-JP': 3,
}
+@total_ordering
+
+class WmmRule(object):
+
+ def __init__(self, vo_c, vi_c, be_c, bk_c, vo_ap, vi_ap, be_ap, bk_ap):
+ self.vo_c = vo_c
+ self.vi_c = vi_c
+ self.be_c = be_c
+ self.bk_c = bk_c
+ self.vo_ap = vo_ap
+ self.vi_ap = vi_ap
+ self.be_ap = be_ap
+ self.bk_ap = bk_ap
+
+ def _as_tuple(self):
+ return (self.vo_c, self.vi_c, self.be_c, self.bk_c,
+ self.vo_ap, self.vi_ap, self.be_ap, self.bk_ap)
+
+ def __eq__(self, other):
+ if other is None:
+ return False
+ return (self._as_tuple() == other._as_tuple())
+
+ def __ne__(self, other):
+ return not (self == other)
+
+ def __lt__(self, other):
+ if other is None:
+ return False
+ return (self._as_tuple() < other._as_tuple())
+
+ def __hash__(self):
+ return hash(self._as_tuple())
+
class FreqBand(object):
def __init__(self, start, end, bw, comments=None):
self.start = start
@@ -32,41 +69,49 @@
self.maxbw = bw
self.comments = comments or []
- def __cmp__(self, other):
- s = self
- o = other
- if not isinstance(o, FreqBand):
- return False
- return cmp((s.start, s.end, s.maxbw), (o.start, o.end, o.maxbw))
+ def _as_tuple(self):
+ return (self.start, self.end, self.maxbw)
+
+ def __eq__(self, other):
+ return (self._as_tuple() == other._as_tuple())
+
+ def __ne__(self, other):
+ return not (self == other)
+
+ def __lt__(self, other):
+ return (self._as_tuple() < other._as_tuple())
def __hash__(self):
- s = self
- return hash((s.start, s.end, s.maxbw))
+ return hash(self._as_tuple())
def __str__(self):
return '<FreqBand %.3f - %.3f @ %.3f>' % (
self.start, self.end, self.maxbw)
+@total_ordering
class PowerRestriction(object):
def __init__(self, max_ant_gain, max_eirp, comments = None):
self.max_ant_gain = max_ant_gain
self.max_eirp = max_eirp
self.comments = comments or []
- def __cmp__(self, other):
- s = self
- o = other
- if not isinstance(o, PowerRestriction):
- return False
- return cmp((s.max_ant_gain, s.max_eirp),
- (o.max_ant_gain, o.max_eirp))
+ def _as_tuple(self):
+ return (self.max_ant_gain, self.max_eirp)
- def __str__(self):
- return '<PowerRestriction ...>'
+ def __eq__(self, other):
+ return (self._as_tuple() == other._as_tuple())
+
+ def __ne__(self, other):
+ return not (self == other)
+
+ def __lt__(self, other):
+ return (self._as_tuple() < other._as_tuple())
def __hash__(self):
- s = self
- return hash((s.max_ant_gain, s.max_eirp))
+ return hash(self._as_tuple())
+
+ def __str__(self):
+ return '<PowerRestriction ...>'
class DFSRegionError(Exception):
def __init__(self, dfs_region):
@@ -76,12 +121,15 @@
def __init__(self, flag):
self.flag = flag
+@total_ordering
class Permission(object):
- def __init__(self, freqband, power, flags):
+ def __init__(self, freqband, power, flags, wmmrule):
assert isinstance(freqband, FreqBand)
assert isinstance(power, PowerRestriction)
+ assert isinstance(wmmrule, WmmRule) or wmmrule is None
self.freqband = freqband
self.power = power
+ self.wmmrule = wmmrule
self.flags = 0
for flag in flags:
if not flag in flag_definitions:
@@ -90,26 +138,33 @@
self.textflags = flags
def _as_tuple(self):
- return (self.freqband, self.power, self.flags)
+ return (self.freqband, self.power, self.flags, self.wmmrule)
- def __cmp__(self, other):
- if not isinstance(other, Permission):
- return False
- return cmp(self._as_tuple(), other._as_tuple())
+ def __eq__(self, other):
+ return (self._as_tuple() == other._as_tuple())
+
+ def __ne__(self, other):
+ return not (self == other)
+
+ def __lt__(self, other):
+ return (self._as_tuple() < other._as_tuple())
def __hash__(self):
return hash(self._as_tuple())
+ def __str__(self):
+ return str(self.freqband) + str(self.power) + str(self.wmmrule)
+
class Country(object):
def __init__(self, dfs_region, permissions=None, comments=None):
self._permissions = permissions or []
self.comments = comments or []
- self.dfs_region = 0
+ self.dfs_region = 0
- if dfs_region:
- if not dfs_region in dfs_regions:
- raise DFSRegionError(dfs_region)
- self.dfs_region = dfs_regions[dfs_region]
+ if dfs_region:
+ if not dfs_region in dfs_regions:
+ raise DFSRegionError(dfs_region)
+ self.dfs_region = dfs_regions[dfs_region]
def add(self, perm):
assert isinstance(perm, Permission)
@@ -233,6 +288,61 @@
self._powerrev[p] = pname
self._powerline[pname] = self._lineno
+ def _parse_wmmrule(self, line):
+ regions = line[:-1].strip()
+ if not regions:
+ self._syntax_error("'wmmrule' keyword must be followed by region")
+
+ regions = regions.split(',')
+
+ self._current_regions = {}
+ for region in regions:
+ if region in self._wmm_rules:
+ self._warn("region %s was added already to wmm rules" % region)
+ self._current_regions[region] = 1
+ self._comments = []
+
+ def _validate_input(self, cw_min, cw_max, aifsn, cot):
+ if cw_min < 1:
+ self._syntax_error("Invalid cw_min value (%d)" % cw_min)
+ if cw_max < 1:
+ self._syntax_error("Invalid cw_max value (%d)" % cw_max)
+ if cw_min > cw_max:
+ self._syntax_error("Inverted contention window (%d - %d)" %
+ (cw_min, cw_max))
+ if not (bin(cw_min + 1).count('1') == 1 and cw_min < 2**15):
+ self._syntax_error("Invalid cw_min value should be power of 2 - 1 (%d)"
+ % cw_min)
+ if not (bin(cw_max + 1).count('1') == 1 and cw_max < 2**15):
+ self._syntax_error("Invalid cw_max value should be power of 2 - 1 (%d)"
+ % cw_max)
+ if aifsn < 1:
+ self._syntax_error("Invalid aifsn value (%d)" % aifsn)
+ if cot < 0:
+ self._syntax_error("Invalid cot value (%d)" % cot)
+
+
+ def _validate_size(self, var, bytcnt):
+ return bytcnt < ceil(len(bin(var)[2:]) / 8.0)
+
+ def _parse_wmmrule_item(self, line):
+ bytcnt = (2.0, 2.0, 1.0, 2.0)
+ try:
+ ac, cval = line.split(':')
+ if not ac:
+ self._syntax_error("wmm item must have ac prefix")
+ except ValueError:
+ self._syntax_error("access category must be followed by colon")
+ p = tuple([int(v.split('=', 1)[1]) for v in cval.split(',')])
+ self._validate_input(*p)
+ for v, b in zip(p, bytcnt):
+ if self._validate_size(v, b):
+ self._syntax_error("unexpected input size expect %d got %d"
+ % (b, v))
+
+ for r in self._current_regions:
+ self._wmm_rules[r][ac] = p
+
def _parse_country(self, line):
try:
cname, cvals= line.split(':', 1)
@@ -248,6 +358,7 @@
for cname in cnames:
if len(cname) != 2:
self._warn("country '%s' not alpha2" % cname)
+ cname = cname.encode('ascii')
if not cname in self._countries:
self._countries[cname] = Country(dfs_region, comments=self._comments)
self._current_countries[cname] = self._countries[cname]
@@ -290,6 +401,15 @@
line = line.split(',')
pname = line[0]
flags = line[1:]
+ w = None
+ if flags and 'wmmrule' in flags[-1]:
+ try:
+ region = flags.pop().split('=', 1)[1]
+ if region not in self._wmm_rules.keys():
+ self._syntax_error("No wmm rule for %s" % region)
+ except IndexError:
+ self._syntax_error("flags is empty list or no region was found")
+ w = WmmRule(*self._wmm_rules[region].values())
if not bname in self._bands:
self._syntax_error("band does not exist")
@@ -303,10 +423,10 @@
b = self._bands[bname]
p = self._power[pname]
try:
- perm = Permission(b, p, flags)
- except FlagError, e:
+ perm = Permission(b, p, flags, w)
+ except FlagError as e:
self._syntax_error("Invalid flag '%s'" % e.flag)
- for cname, c in self._current_countries.iteritems():
+ for cname, c in self._current_countries.items():
if perm in c:
self._warn('Rule "%s, %s" added to "%s" twice' % (
bname, pname, cname))
@@ -315,6 +435,7 @@
def parse(self, f):
self._current_countries = None
+ self._current_regions = None
self._bands = {}
self._power = {}
self._countries = {}
@@ -326,6 +447,7 @@
self._powerdup = {}
self._bandline = {}
self._powerline = {}
+ self._wmm_rules = defaultdict(lambda: OrderedDict())
self._comments = []
@@ -337,6 +459,7 @@
self._comments.append(line[1:].strip())
line = line.replace(' ', '').replace('\t', '')
if not line:
+ self._current_regions = None
self._comments = []
line = line.split('#')[0]
if not line:
@@ -344,23 +467,35 @@
if line[0:4] == 'band':
self._parse_band(line[4:])
self._current_countries = None
+ self._current_regions = None
self._comments = []
elif line[0:5] == 'power':
self._parse_power(line[5:])
self._current_countries = None
+ self._current_regions = None
self._comments = []
elif line[0:7] == 'country':
self._parse_country(line[7:])
self._comments = []
+ self._current_regions = None
elif self._current_countries is not None:
+ self._current_regions = None
self._parse_country_item(line)
self._comments = []
+ elif line[0:7] == 'wmmrule':
+ self._parse_wmmrule(line[7:])
+ self._current_countries = None
+ self._comments = []
+ elif self._current_regions is not None:
+ self._parse_wmmrule_item(line)
+ self._current_countries = None
+ self._comments = []
else:
self._syntax_error("Expected band, power or country definition")
countries = self._countries
bands = {}
- for k, v in self._bands.iteritems():
+ for k, v in self._bands.items():
if k in self._bands_used:
bands[self._banddup[k]] = v
continue
@@ -369,7 +504,7 @@
self._lineno = self._bandline[k]
self._warn('Unused band definition "%s"' % k)
power = {}
- for k, v in self._power.iteritems():
+ for k, v in self._power.items():
if k in self._power_used:
power[self._powerdup[k]] = v
continue
diff --git /dev/null b/db2fw.py
--- /dev/null
+++ b/db2fw.py
@@ -0,0 +1,158 @@
+#!/usr/bin/env python
+
+from io import BytesIO, open
+import struct
+import hashlib
+from dbparse import DBParser
+import sys
+from math import log
+
+MAGIC = 0x52474442
+VERSION = 20
+
+if len(sys.argv) < 3:
+ print('Usage: %s output-file input-file' % sys.argv[0])
+ sys.exit(2)
+
+def create_rules(countries):
+ result = {}
+ for c in countries.values():
+ for rule in c.permissions:
+ result[rule] = 1
+ return list(result)
+
+def create_collections(countries):
+ result = {}
+ for c in countries.values():
+ result[(c.permissions, c.dfs_region)] = 1
+ return list(result)
+
+def create_wmms(countries):
+ result = {}
+ for c in countries.values():
+ for rule in c.permissions:
+ if rule.wmmrule is not None:
+ result[rule.wmmrule] = 1
+ return list(result)
+
+def be32(output, val):
+ output.write(struct.pack('>I', val))
+def be16(output, val):
+ output.write(struct.pack('>H', val))
+
+class PTR(object):
+ def __init__(self, output):
+ self._output = output
+ self._pos = output.tell()
+ be16(output, 0)
+ self._written = False
+
+ def set(self, val=None):
+ if val is None:
+ val = self._output.tell()
+ assert val & 3 == 0
+ self._offset = val
+ pos = self._output.tell()
+ self._output.seek(self._pos)
+ be16(self._output, val >> 2)
+ self._output.seek(pos)
+ self._written = True
+
+ def get(self):
+ return self._offset
+
+ @property
+ def written(self):
+ return self._written
+
+p = DBParser()
+countries = p.parse(open(sys.argv[2], 'r', encoding='utf-8'))
+rules = create_rules(countries)
+rules.sort()
+collections = create_collections(countries)
+collections.sort()
+wmms = create_wmms(countries)
+wmms.sort()
+
+output = BytesIO()
+
+# struct regdb_file_header
+be32(output, MAGIC)
+be32(output, VERSION)
+
+country_ptrs = {}
+countrynames = list(countries)
+countrynames.sort()
+for alpha2 in countrynames:
+ coll = countries[alpha2]
+ output.write(struct.pack('>2s', alpha2))
+ country_ptrs[alpha2] = PTR(output)
+output.write(b'\x00' * 4)
+
+wmmdb = {}
+for w in wmms:
+ assert output.tell() & 3 == 0
+ wmmdb[w] = output.tell() >> 2
+ for r in w._as_tuple():
+ ecw = int(log(r[0] + 1, 2)) << 4 | int(log(r[1] + 1, 2))
+ ac = (ecw, r[2],r[3])
+ output.write(struct.pack('>BBH', *ac))
+
+reg_rules = {}
+flags = 0
+for reg_rule in rules:
+ freq_range, power_rule, wmm_rule = reg_rule.freqband, reg_rule.power, reg_rule.wmmrule
+ reg_rules[reg_rule] = output.tell()
+ assert power_rule.max_ant_gain == 0
+ flags = 0
+ # convert to new rule flags
+ assert reg_rule.flags & ~0x899 == 0
+ if reg_rule.flags & 1<<0:
+ flags |= 1<<0
+ if reg_rule.flags & 1<<3:
+ flags |= 1<<1
+ if reg_rule.flags & 1<<4:
+ flags |= 1<<2
+ if reg_rule.flags & 1<<7:
+ flags |= 1<<3
+ if reg_rule.flags & 1<<11:
+ flags |= 1<<4
+ rule_len = 16
+ cac_timeout = 0 # TODO
+ if not (flags & 1<<2):
+ cac_timeout = 0
+ if cac_timeout or wmm_rule:
+ rule_len += 2
+ if wmm_rule is not None:
+ rule_len += 2
+ output.write(struct.pack('>BBHIII', rule_len, flags, int(power_rule.max_eirp * 100),
+ int(freq_range.start * 1000), int(freq_range.end * 1000), int(freq_range.maxbw * 1000),
+ ))
+ if rule_len > 16:
+ output.write(struct.pack('>H', cac_timeout))
+
+ if rule_len > 18:
+ be16(output, wmmdb[wmm_rule])
+
+ while rule_len % 4:
+ output.write('\0')
+ rule_len += 1
+
+for coll in collections:
+ for alpha2 in countrynames:
+ if (countries[alpha2].permissions, countries[alpha2].dfs_region) == coll:
+ assert not country_ptrs[alpha2].written
+ country_ptrs[alpha2].set()
+ slen = 3
+ output.write(struct.pack('>BBBx', slen, len(list(coll[0])), coll[1]))
+ coll = list(coll[0])
+ for regrule in coll:
+ be16(output, reg_rules[regrule] >> 2)
+ if len(coll) % 2:
+ be16(output, 0)
+
+for alpha2 in countrynames:
+ assert country_ptrs[alpha2].written
+
+outfile = open(sys.argv[1], 'wb')
+outfile.write(output.getvalue())