pygds/pygds/reader.py

150 lines
3.7 KiB
Python

from datetime import datetime
from .record import *
import abc
import ctypes
class ProgressGetter(object, metaclass=abc.ABCMeta):
@abc.abstractproperty
def current(self):
pass
@abc.abstractproperty
def total(self):
pass
@abc.abstractmethod
def progress(self):
raise NotImplementedError("getting the progess is not implemented")
class Reader(ProgressGetter):
@property
def current(self):
return float(self.stream.tell())
# returns the read progress in percent
def progress(self):
if self.total <= 0:
return 1.0
return self.current / self.total
@property
def total(self):
return self._total
def __init__(self, file):
self.stream = file
# find file size
self.stream.seek(0, 2)
self._total = self.stream.tell()
self.stream.seek(0, 0)
def skip(self, n):
self.stream.read(n)
def read_uint(self):
temp = self.stream.read(4)
if len(temp) != 4:
return None
return int(temp[3]) | int(temp[2]) << 8 | int(temp[1]) << 16 | int(temp[0]) << 24
def read_ushort(self):
temp = self.stream.read(2)
if len(temp) != 2:
return None
return int(temp[1]) | int(temp[0]) << 8
def read_short(self):
temp = self.read_ushort()
if temp == None:
return None
return ctypes.c_short(temp).value
def read_int(self):
temp = self.read_uint()
if temp == None:
return None
return ctypes.c_int(temp).value
def read_double(self):
temp = self.stream.read(8)
if len(temp) != 8:
return None
result = 0
for i in temp:
if int(i) != 0:
# read double
for j in range(1,8):
result += float(temp[j])/(2.0**(j*8))
exp = int(temp[0]) & 0x7F
exp -= 64
result *= 16**exp
if int(temp[0]) & 0x80:
result *= -1
return result
# double is Zero
return 0
def read_ascii(self, len):
# removes zero terminators
# as well as trailing and beginning whitespaces
return self.stream.read(len).decode("ASCII").replace("\x00", "").strip()
def read_record(self):
result = Record()
result.len = self.read_ushort()
result.ident = self.read_short()
try:
result.ident = Records(result.ident)
except ValueError as e:
result.ident = e
result.len -= 4 # remove record header len
return result
def read_date(self):
# date
year = max(self.read_ushort(),1) # year must be > then 1 for datetime
month = max(1, min(12, self.read_ushort())) # month must be in range 1 - 12
day = max(1, min(31, self.read_ushort())) # day must be in range 1 - 31
# time
hour = max(0, min(23, self.read_ushort()))
minute = max(0, min(59, self.read_ushort()))
second = max(0, min(59, self.read_ushort()))
return datetime(year=year, month=month, day=day, hour=hour, minute=minute, second=second)
def read_coord(self):
X = self.read_int()
Y = self.read_int()
return (X,Y)
def read_coords(self, len):
len /= 8
result = []
while len > 0:
point = self.read_coord()
if not point:
return None
result.append(point)
len -= 1
return result