3. 南京来可USBCAN-132B模块#

  • 完成模块收发CAN报文

import numpy as np
from matplotlib import pyplot as plt

x = np.linspace(1,100,100)
y = 0.01*x**2 -x +123
y = y+np.random.random(len(x))
plt.plot(x,y,label='original')

y2 = np.zeros((100))
y2[0] = y[0]
y2[99] = y[99]

for i in range(98):
    y2[i+1] = (y[i]+y[i+1]+y[i+2])/3
y2 += 10
plt.plot(x,y2,label='FIR')

for i in range(98):
    y[i+1] = (y[i]+y[i+1]+y[i+2])/3
y += 15
plt.plot(x,y,label='IIR')

plt.legend()
plt.show()
---------------------------------------------------------------------------
ModuleNotFoundError                       Traceback (most recent call last)
Cell In[1], line 1
----> 1 import numpy as np
      2 from matplotlib import pyplot as plt
      4 x = np.linspace(1,100,100)

ModuleNotFoundError: No module named 'numpy'
import sys
import platform
import ctypes
import struct
import binascii
import enum
from typing import Any


print(f'系统版本:{sys.version}')
print(f'Pythn版本:{platform.python_version()}')
print(f'Python平台:{platform.architecture()}')

# if platform.architecture()[0] == '32bit':
#     _dll = ctypes.cdll.LoadLibrary(r"./NJLike/32bit/CanCmd.dll")
# else:
#     _dll = ctypes.cdll.LoadLibrary(r"./NJLike/64bit/CanCmd.dll")

# ACUSB_132B = 2
# USB_INDEX = 0
# handle = 0

# def can_open(deviceType:int,usbIndex:int) -> int :
#     if _dll:
#         global handle 
#         handle = _dll.CAN_DeviceOpen(deviceType,usbIndex,ctypes.c_char_p(b""))
#         if not handle:
#             raise Exception(f"打开{usbIndex=}设备{deviceType=}失败!")
#         return handle
#     else:
#         raise Exception(f"加载'CanCmd.dll'失败!")

# if can_open(ACUSB_132B,0):
#     print(f"打开设备成功 {handle = }")

##############################################################################################################

class WorkMode(enum.Enum):
    Normal = 0
    Listen = 1

class DeviceType(enum.Enum):
    ACUSB_131B = 1
    ACUSB_132B = 2

class Baudrate(enum.Enum):
    _5Kbps = 0xffbf
    _10Kbps = 0x1c31
    _20Kbps = 0x1c18
    _50Kbps = 0x1c09
    _100Kbps = 0x1c04
    _125Kbps = 0x1c03
    _250Kbps = 0x1c01
    _500Kbps = 0x1c00
    _800Kbps = 0x1600
    _1000Kbps = 0x1400

class FilterMode(enum.Enum):
    Filter_None = 0
    Filter_Double = 1
    Filter_Single = 2

##############################################################################################################

class CanInitConfig():

    def __init__(self) -> None:
        self._s = struct.Struct('< 2B 3I 2B')
        self._b = ctypes.create_string_buffer(self._s.size)
    
    def set_WorkMode(self,WorkMode_xx:WorkMode) -> None:
        self._b[0] = WorkMode_xx.value

    def set_Baudrate(self,Baudrate_xx:Baudrate) -> None:
        self._b[1] = 1 # 固定为SJA1000模式
        l=list(self._s.unpack_from(self._b,0))
        l[2] = Baudrate_xx.value
        self._s.pack_into(self._b,0,*l)
    
    def set_Filter(self,FilterMode_xx:FilterMode) -> None:
        l = list(self._s.unpack_from(self._b,0))
        l[5] = FilterMode_xx.value
        self._s.pack_into(self._b,0,*l)
    
    def set_Acc(self,AccCode:int):
        l = list(self._s.unpack_from(self._b,0))
        l[3] = AccCode
        self._s.pack_into(self._b,0,*l)

    def set_Mask(self,MaskCode:int):
        l = list(self._s.unpack_from(self._b,0))
        l[4] = MaskCode
        self._s.pack_into(self._b,0,*l)

    def __str__(self) -> str:
        t = self._s.unpack_from(self._b,0)
        s = "{0:8s}: {1}\r\n".format("工作模式",WorkMode(t[0]).name)
        s += "{0:9s}: {1}\r\n".format("波特率",Baudrate(t[2]).name)
        s += "{0:8s}: {1}\r\n".format("滤波模式",FilterMode(t[5]).name)
        s += "{0:12s}: {1:#0x}\r\n".format("AccCode",t[3])
        s += "{0:12s}: {1:#0x}\r\n".format("MaskCode",t[4])
        s += "原始数据: {0}\r\n解码数据: {1}".format(binascii.hexlify(self._b.raw," ",1).decode(),self._s.unpack_from(self._b,0))
        return s

##############################################################################################################
    
class CanChannel(object):
    def __init__(self,dll:ctypes.CDLL,handle:int, channel:int) -> object:
        self._dll = dll
        self._handle = handle
        self._channel = channel
        self._start = False
    
    def start(self, cfg:CanInitConfig) -> bool:
        if self._start :
            return True
        
        result = self._dll.CAN_ChannelStart(self._handle,self._channel,ctypes.c_char_p(cfg._b.raw))
        if result:
            self._start = True
            self._cfg = cfg
            return True
        else:
            return False
    def stop(self) -> bool:
        result = self._dll.CAN_ChannelStop(self._handle,self._channel)
        if result:
            self._start = False
            return True
        else:
            return False
    
    def __str__(self) -> str:
        return str(self._cfg) if self._start else ""


##############################################################################################################

class AcUsbCan (object):

    def __init__(self,dll:str,DeviceType_xx:DeviceType,usbIndex:int = 0) -> object:
        self._device = DeviceType_xx.value
        self._usbIndex = usbIndex
        self._information = ()
        self._handle = 0
        try:
            self._dll = ctypes.cdll.LoadLibrary(dll)
        except:
            raise

    def open(self) -> bool:
        self._handle = self._dll.CAN_DeviceOpen(self._device,self._usbIndex,ctypes.c_char_p(b""))
        self.get_deviceInfo()
        self.CAN0 = CanChannel(self._dll,self._handle,0)
        if self._information[5] > 1:
            self.CAN1 = CanChannel(self._dll,self._handle,1)

        if self._handle == 0:
            raise Exception("打开 usb 索引 {0}{1} 设备失败!".format(self._usbIndex,DeviceType(self._device).name))
        else:
            return True
    
    def close(self) -> bool:
        result = self._dll.CAN_DeviceClose(self._handle)
        if result:
            del(self.CAN0)
            if(self._information[5] > 1):
                del(self.CAN1)
            return True
        else:
            return False
    
    def get_deviceInfo(self) -> bool:
        s = struct.Struct('< 5H B 20s 40s 20s')
        b = ctypes.create_string_buffer(s.size)
        if self._dll.CAN_GetDeviceInfo(self._handle,b):
            self._information = (s.unpack_from(b,0))
            # print(self)
            return True
        else:
            self._information = ()
            return False

    def __str__(self) -> str:
        if self._information:
            s_info  = "{0:8}: {1:x}.{2:x} \r\n".format("硬件版本",self._information[0]>>8,self._information[0]&0xff)
            s_info += "{0:8}: {1:x}.{2:x} \r\n".format("固件版本",self._information[1]>>8,self._information[1]&0xff)
            s_info += "{0:8}: {1:x}.{2:x} \r\n".format("驱动版本",self._information[2]>>8,self._information[2]&0xff)
            s_info += "{0:7}: {1:x}.{2:x} \r\n".format("接口库版本",self._information[3]>>8,self._information[3]&0xff)
            s_info += "{0:9}: {1} \r\n".format("中断号",self._information[4])
            s_info += "{0:9}: {1} \r\n".format("总路数",self._information[5])
            s_info += "{0:7}: {1} \r\n".format("设备序列号",self._information[6].decode())
            s_info += "{0:8}: {1} \r\n".format("硬件类型",self._information[7].decode())
            s_info += "{0:8}: {1}".format("设备描述",self._information[8].decode())
            return s_info
        else:
            return "<class AcUsbCan> id:{0:#x} \r\nuse 'get_deviceInfo()' for more information.".format(id(self))
    


##############################################################################################################

from types import DynamicClassAttribute

class SendType(enum.Enum):
    Normal = 0
    Once = 1
    Lookback = 2
    LookbackOnce = 3
    
    @DynamicClassAttribute
    def annotation(self) -> str:
        _translate = {
            0:"正常发送模式",
            1:"单次正常发送",
            2:"正常自发自收",
            3:"单次自发自收",
        }
        return _translate[self.value]

   

class DataFrame(ctypes.Structure):
    _fields_ = [
        ('_TimeFlag', ctypes.c_uint32),
        ('_SendType', ctypes.c_byte),
        ('_RemoteFlag',ctypes.c_byte),
        ('_ExternFlag',ctypes.c_byte),
        ('_DataLen',ctypes.c_int8,),
        ('_ID',ctypes.c_uint32),
        ('_arrayData',ctypes.c_byte*8)
    ]

    def __init__(self, id:int = 0,data:bytes = b'\x00\x00\x00\x00\x00\x00\x00\x00',isExtend:bool = True,
                 sendType:SendType=SendType.Lookback,isRemote:bool=False,*args: Any, **kw: Any) -> None:
        super().__init__(*args, **kw)
        self._SendType = sendType.value
        self._RemoteFlag = 1 if isRemote else 0
        self._ExternFlag = 1 if isExtend else 0
        self._DataLen = 8 if len(data)>8 else len(data)
        self._ID = id
        self._arrayData = (ctypes.c_byte*8)()
        ctypes.memmove(self._arrayData,data,8 if len(data)>8 else len(data))
    
    @property
    def data(self):
        return tuple([ i for i in self._arrayData])
    @data.setter
    def data(self, value:bytes):
        _a = bytearray(8)
        for i in range(8 if len(value)>8 else len(value)):
            _a[i] = value[i]
        self._arrayData = (ctypes.c_byte*8).from_buffer_copy(_a,0)
        print(f"{len(value)=}")
        self._DataLen = 8 if len(value)>8 else len(value)

    @property
    def dlc(self):
        return self._DataLen
    
    @property
    def id(self):
        return self._ID & 0x1FFFFFFFF if self._ExternFlag else self._ID & 0x3FF
    @id.setter
    def id(self,value:int):
        self.ID = value

    @property
    def remote(self):
        return True if self._RemoteFlag == 1 else False
    @remote.setter
    def remote(self,value:bool):
        self._RemoteFlag = 1 if value else 0
    
    @property
    def extend(self):
        return True if self._ExternFlag == 1 else False
    @extend.setter
    def extend(self,value:bool):
        self._ExternFlag = 1 if value else 0

    @property
    def mode(self):
        return SendType(self._SendType).annotation
    @mode.setter
    def mode(self,value:SendType):
        self._SendType = value.value
    
    def __str__(self) -> str:
        return super().__str__()
    
        
##############################################################################################################

if platform.architecture()[0] == '32bit':
    dll_filename = r"./NJLike/32bit/CanCmd.dll"
else:
    dll_filename = r"./NJLike/64bit/CanCmd.dll"

usbcan = AcUsbCan(dll_filename,DeviceType.ACUSB_132B)
usbcan.open()
# usbcan.get_deviceInfo()
# print(usbcan)

init = CanInitConfig()
init.set_WorkMode(WorkMode.Listen)
init.set_Baudrate(Baudrate._125Kbps)
init.set_Filter(FilterMode.Filter_Single)
init.set_Acc(0xAA55)
init.set_Mask(0x55AA)

# usbcan.CAN0.start(init)
can0 = usbcan.CAN0
can0.start(init)
print(can0)
# import time
# time.sleep(3)
# can0.stop()

data = DataFrame()
data.ID = 0x12345
data.extend = False
print(data)
print(data.data)
print(data.dlc)
print(hex(data.id))
data.data = b'ABC'
print(data.data)
print(data.dlc)
print(hex(data.id))

print(data._fields_)
print(data._fields_[6][1][0])
print(data.mode)
data.mode = SendType.Normal
print(data.mode)

# struct.pack("< I 4B I 8B",*(1,2,3,4,5,6,1,2,3,4,5,6,7,8))
# struct.pack("< I 4B I B",data._fields_)

# struct.pack("< I 4B I 8B",*data._fields_)
print(type(data._fields_))

memoryview(data).hex()
import datetime

t = datetime.datetime.now()
print(t.strftime("%Y/%m/%d %H:%M:%S"))

t1 = datetime.datetime.fromtimestamp(0)
print(t1.strftime("%Y/%m/%d %H:%M:%S"))
# a = (1,2,3)
# b = ('a','b','c','d')

# list(zip(a,b))

a =[1,2,3,4,5]
b = list(a[:3])
b.append(a[3:])
print(a,b)
a.pop(3)
print(a)

t = b'12042504\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
print(t)
print(t.decode())
t.removesuffix(b'\x00')
# t.replace(b'\x00','')
print(t.decode().__repr__().strip('\x00'))
from USBCAN import *
import threading
from time import sleep

def receive_work(ch:Channel):
    while True:
        # print(f'当前未读CAN帧数:{ch.count}')
        if ch.count:
            _rec = ch.receive()
            print(f'{threading.current_thread().name}({threading.current_thread().native_id:#010x})',
                  f'{ch.data_receive_time(_rec)} CAN{ch.channel} {str(_rec)}') if _rec else None



dev = AcusbCan_132B()

dev.CANO.start()
dat = CanDataFrame(0x12345678,b'abcdefgh')

for i in range(10):
    print('*'*40)
    if not dev.CANO.send(dat):
        print(f'{dev.CANO.msg_error = }')

rx_thread = threading.Thread(target=receive_work,args=(dev.CANO,),name="CAN接收线程",daemon=True)

print(f'{threading.active_count() = }')
rx_thread.start()
# rx_thread.run()
print(f'{threading.active_count() = }')

print(f'{threading.current_thread().name = }')

for i in range(10):
    print('#######################################')
    if not dev.CANO.send(dat):
        print(f'{dev.CANO.msg_error = }')

del rx_thread
dev.close()
print('-'*40)
import threading
import time

a = 5

def test(a):
    while True:
        print('我是子线程', a)
        time.sleep(5)

thread = threading.Thread(target=test, args=(a, ),daemon=True)
thread.start()

for i in range(5):
    print('我是主线程')
    time.sleep(1)
from broadcast_service import broadcast_service


# callback of common method
def handle_msg(params):
    print(f"handle_msg receive params: {params}")


# callback of decorator
@broadcast_service.on_listen(['my_topic'])
def handle_decorator_msg(params):
    print(f"handle_decorator_msg receive params: {params}")

if __name__ == '__main__':
    info = 'This is very important msg'

    # subscribe topic
    broadcast_service.subscribe('my_topic', handle_msg)

    # publish broadcast
    broadcast_service.publish('my_topic', info)
handle_decorator_msg receive params: This is very important msg
handle_msg receive params: This is very important msg