0

the original code which only support python 2 is here

link to thinkgear.py

I'm trying to edit it to support python 3. the edited code is here:

import sys

import serial

from io import BytesIO

import struct

from collections import namedtuple

import logging
import logging.handlers

import sys
import time

import datetime

global delta 
delta = []

_log = logging.getLogger(__name__)

_bytelog = logging.getLogger(__name__+'.bytes')
_bytelog.propagate = False
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
_log.addHandler(fh)

class ThinkGearProtocol(object):

    def __init__(self, port):

        self.serial = serial.Serial(port, 57600)
        self.preread = BytesIO()
        self.io = self.serial

    @staticmethod
    def _chksum(packet):
        return ~sum(c for c in packet ) & 0xff

    def _read(self, n):
        buf = self.io.read(n)
        if len(buf) < n:
            _log.debug('incomplete read, short %s bytes', n - len(buf))
            if self.io == self.preread:
                _log.debug('end of preread buffer')
                # self.preread.reset()
                # self.preread.truncate()
                # two line comment out
                self.io = self.serial
                buf += self.io.read(n-len(buf))
                if len(buf) < n:
                    _log.debug('incomplete read, short %s bytes', n - len(buf))

        for o in range(0, len(buf), 16):
            _bytelog.debug('%04X  '+' '.join(('%02X',)*len(buf[o:o+16])), o, *(c for c in buf[o:o+16]))

        return buf

    def _deread(self, buf):
        _log.debug('putting back %s bytes', len(buf))
        pos = self.preread.tell()
        self.preread.seek(0, 2)
        self.preread.write(buf)
        self.preread.seek(pos)
        self.io = self.preread

    def get_packets(self):
        last_two = ()
        while True:
            last_two = last_two[-1:]+(self._read(1),)
            # _log.debug('last_two: %r', last_two)
            if last_two == (b'\xAA',b'\xAA'):
                plen = self._read(1)
                if plen >= b'\xAA':
                    # Bogosity
                    _log.debug('discarding %r while syncing', last_two[0])
                    last_two = last_two[-1:]+(plen,)

                else:
                    last_two = ()
                    packet = self._read(int.from_bytes((plen), byteorder='big'))
                    # _log.debug(plen)
                    checksum = self._read(1)

                    if ord(checksum) == self._chksum(packet):
                        yield self._decode(packet)

                    else:
                        _log.debug('bad checksum')
                        self._deread(packet+checksum)

            elif len(last_two) == 2:
                _log.debug('discarding %r while syncing', last_two[0])


    def _decode(self, packet):
        decoded = []

        while packet:
            extended_code_level = 0
            while len(packet) and packet[0] == '\x55':
                extended_code_level += 1
                packet = packet[1:]
            if len(packet) < 2:
                _log.debug('ran out of packet: %r', '\x55'*extended_code_level+packet)
                break
            code = packet[0]
            if code < 0x80:
                value = packet[1]
                packet = packet[2:]
            else:
                vlen = packet[1]
                if len(packet) < 2+vlen:
                    _log.debug('ran out of packet: %r', '\x55'*extended_code_level+chr(code)+chr(vlen)+packet)
                    break
                value = packet[2:2+vlen]
                packet = packet[2+vlen:]

            # _log.debug('extended_code_level is '+str(extended_code_level))
            # _log.debug('code is '+str(code))  
            # _log.debug('data_types is '+str(data_types))
            # _log.debug(not extended_code_level and code in data_types)
            # _log.debug(not bool(extended_code_level and code in data_types))
            # _log.debug((extended_code_level,code) in data_types)
            if not bool(extended_code_level and code in data_types):
                data = data_types[code](extended_code_level, code, value)
                # _log.debug('extended_code_level is '+str(extended_code_level))
                # _log.debug('code is '+str(code))
                # _log.debug('value is '+str(value))
                # _log.debug('data_types is '+str(data_types))
            elif (extended_code_level,code) in data_types:
                data = data_types[(extended_code_level,code)](extended_code_level, code, value)

            else:
                data = ThinkGearUnknownData(extended_code_level, code, value)

            decoded.append(data)

        return decoded


data_types = {}

class ThinkGearMetaClass(type):
    def __new__(mcls, name, bases, data):
        cls = super(ThinkGearMetaClass, mcls).__new__(mcls, name, bases, data)
        code = getattr(cls, 'code', None)
        if code:
            data_types[code] = cls
            extended_code_level = getattr(cls, 'extended_code_level', None)
            if extended_code_level:
                data_types[(extended_code_level,code)] = cls
        return cls


class ThinkGearData(object, metaclass=ThinkGearMetaClass):
    def __init__(self, extended_code_level, code, value):
        self.extended_code_level = extended_code_level
        self.code = code
        # self.value = self._decode(value)
        self.value = value
        # _log.debug('123')
        if self._log:
            _log.log(self._log, '%s', self)

    @staticmethod

    def _decode(v):
        return v

    def __str__(self):
        return self._strfmt % vars(self)

    # __metaclass__ = ThinkGearMetaClass

    _log = logging.DEBUG


class ThinkGearUnknownData(ThinkGearData):
    '''???'''
    _strfmt = 'Unknown: code=%(code)02X extended_code_level=%(extended_code_level)s %(value)r'


class ThinkGearPoorSignalData(ThinkGearData):
    '''POOR_SIGNAL Quality (0-255)'''
    code = 0x02
    _strfmt = 'POOR SIGNAL: %(value)s'
    _decode = staticmethod(ord)


class ThinkGearAttentionData(ThinkGearData):
    '''ATTENTION eSense (0 to 100)'''
    code = 0x04
    _strfmt = 'ATTENTION eSense: %(value)s'
    _decode = staticmethod(ord)


class ThinkGearMeditationData(ThinkGearData):
    '''MEDITATION eSense (0 to 100)'''
    code = 0x05
    _strfmt = 'MEDITATION eSense: %(value)s'
    _decode = staticmethod(ord)


class ThinkGearRawWaveData(ThinkGearData):
    '''RAW Wave Value (-32768 to 32767)'''
    code = 0x80
    _strfmt = 'Raw Wave: %(value)s'
    _decode = staticmethod(lambda v: struct.unpack('>h', v)[0])
    # There are lots of these, don't log them by default
    _log = False


EEGPowerData = namedtuple('EEGPowerData', 'delta theta lowalpha highalpha lowbeta highbeta lowgamma midgamma')
delta_value = namedtuple('EEGPowerData', 'delta')
class ThinkGearEEGPowerData(ThinkGearData):
    '''Eight EEG band power values (0 to 16777215).
    
    delta, theta, low-alpha high-alpha, low-beta, high-beta, low-gamma, and
    mid-gamma EEG band power values.
    '''

    code = 0x83
    _strfmt = 'ASIC EEG Power: %(value)r'
    _decode = staticmethod(lambda v: EEGPowerData(*struct.unpack('>8L', ''.join( '\x00'+v[o:o+3] for o in range(0, 24, 3)))))
    #print(EEGPowerData.delta)


def main():
    global packet_log
    packet_log = []
    logging.basicConfig(level=logging.DEBUG)

    for pkt in ThinkGearProtocol('COM3').get_packets():
        packet_log.append(pkt)
if __name__ == '__main__':
    main()

when running in python2, i get the result like this:

DEBUG:__main__:ASIC EEG Power: EEGPowerData(delta=7784, theta=7734, lowalpha=2035, highalpha=1979, lowbeta=2914, highbeta=3996, lowgamma=1944, midgamma=1847

when running in python3, the result is like this:

DEBUG:__main__:ASIC EEG Power: b'\x00\xa9\xf1\x00t%\x00\rK\x00\x18"\x00\x16%\x00\x1d6\x00OT\x00\x17\x84'

Anyone know how should i edit this line of code in order to make it work in python 3? Thank you

_decode = staticmethod(lambda v: EEGPowerData(*struct.unpack('>8L', ''.join( '\x00'+v[o:o+3] for o in range(0, 24, 3)))))
9
  • Okay, it looks complete now, but it's also important that you make it minimal too. (read minimal reproducible example) -- the bug is only in the construction of the data_types dict. ---- anyway, the question is probably a duplicate of metaclass - __metaclass__ in Python 3 - Stack Overflow , if there isn't any other errors.. Commented Feb 1, 2021 at 6:20
  • really appreciate your help! it worked. but now have other errors... Commented Feb 1, 2021 at 6:27
  • DEBUG:__main__:ASIC EEG Power: b'\x00\r|\x00L\xa3\x00F,\x00\x0b\xca\x00\x1a\x80\x00)#\x006\xec\x00\x0cN' Commented Feb 1, 2021 at 6:29
  • What's the actual and expected behavior? (edit the question, including your fixed code.) Commented Feb 1, 2021 at 6:30
  • do you know why the EEG power is not decoded into 8 numbers? Commented Feb 1, 2021 at 6:30

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.