3. 南京来可USBCAN-132B模块#
完成模块收发CAN报文
import numpy as np
from matplotlib import pyplot as plt
x = np.linspace(1,100,100)
y = 0.01*x**2 -x +123
y = y+np.random.random(len(x))
plt.plot(x,y,label='original')
y2 = np.zeros((100))
y2[0] = y[0]
y2[99] = y[99]
for i in range(98):
y2[i+1] = (y[i]+y[i+1]+y[i+2])/3
y2 += 10
plt.plot(x,y2,label='FIR')
for i in range(98):
y[i+1] = (y[i]+y[i+1]+y[i+2])/3
y += 15
plt.plot(x,y,label='IIR')
plt.legend()
plt.show()
---------------------------------------------------------------------------
ModuleNotFoundError Traceback (most recent call last)
Cell In[1], line 1
----> 1 import numpy as np
2 from matplotlib import pyplot as plt
4 x = np.linspace(1,100,100)
ModuleNotFoundError: No module named 'numpy'
import sys
import platform
import ctypes
import struct
import binascii
import enum
from typing import Any
print(f'系统版本:{sys.version}')
print(f'Pythn版本:{platform.python_version()}')
print(f'Python平台:{platform.architecture()}')
# if platform.architecture()[0] == '32bit':
# _dll = ctypes.cdll.LoadLibrary(r"./NJLike/32bit/CanCmd.dll")
# else:
# _dll = ctypes.cdll.LoadLibrary(r"./NJLike/64bit/CanCmd.dll")
# ACUSB_132B = 2
# USB_INDEX = 0
# handle = 0
# def can_open(deviceType:int,usbIndex:int) -> int :
# if _dll:
# global handle
# handle = _dll.CAN_DeviceOpen(deviceType,usbIndex,ctypes.c_char_p(b""))
# if not handle:
# raise Exception(f"打开{usbIndex=}设备{deviceType=}失败!")
# return handle
# else:
# raise Exception(f"加载'CanCmd.dll'失败!")
# if can_open(ACUSB_132B,0):
# print(f"打开设备成功 {handle = }")
##############################################################################################################
class WorkMode(enum.Enum):
Normal = 0
Listen = 1
class DeviceType(enum.Enum):
ACUSB_131B = 1
ACUSB_132B = 2
class Baudrate(enum.Enum):
_5Kbps = 0xffbf
_10Kbps = 0x1c31
_20Kbps = 0x1c18
_50Kbps = 0x1c09
_100Kbps = 0x1c04
_125Kbps = 0x1c03
_250Kbps = 0x1c01
_500Kbps = 0x1c00
_800Kbps = 0x1600
_1000Kbps = 0x1400
class FilterMode(enum.Enum):
Filter_None = 0
Filter_Double = 1
Filter_Single = 2
##############################################################################################################
class CanInitConfig():
def __init__(self) -> None:
self._s = struct.Struct('< 2B 3I 2B')
self._b = ctypes.create_string_buffer(self._s.size)
def set_WorkMode(self,WorkMode_xx:WorkMode) -> None:
self._b[0] = WorkMode_xx.value
def set_Baudrate(self,Baudrate_xx:Baudrate) -> None:
self._b[1] = 1 # 固定为SJA1000模式
l=list(self._s.unpack_from(self._b,0))
l[2] = Baudrate_xx.value
self._s.pack_into(self._b,0,*l)
def set_Filter(self,FilterMode_xx:FilterMode) -> None:
l = list(self._s.unpack_from(self._b,0))
l[5] = FilterMode_xx.value
self._s.pack_into(self._b,0,*l)
def set_Acc(self,AccCode:int):
l = list(self._s.unpack_from(self._b,0))
l[3] = AccCode
self._s.pack_into(self._b,0,*l)
def set_Mask(self,MaskCode:int):
l = list(self._s.unpack_from(self._b,0))
l[4] = MaskCode
self._s.pack_into(self._b,0,*l)
def __str__(self) -> str:
t = self._s.unpack_from(self._b,0)
s = "{0:8s}: {1}\r\n".format("工作模式",WorkMode(t[0]).name)
s += "{0:9s}: {1}\r\n".format("波特率",Baudrate(t[2]).name)
s += "{0:8s}: {1}\r\n".format("滤波模式",FilterMode(t[5]).name)
s += "{0:12s}: {1:#0x}\r\n".format("AccCode",t[3])
s += "{0:12s}: {1:#0x}\r\n".format("MaskCode",t[4])
s += "原始数据: {0}\r\n解码数据: {1}".format(binascii.hexlify(self._b.raw," ",1).decode(),self._s.unpack_from(self._b,0))
return s
##############################################################################################################
class CanChannel(object):
def __init__(self,dll:ctypes.CDLL,handle:int, channel:int) -> object:
self._dll = dll
self._handle = handle
self._channel = channel
self._start = False
def start(self, cfg:CanInitConfig) -> bool:
if self._start :
return True
result = self._dll.CAN_ChannelStart(self._handle,self._channel,ctypes.c_char_p(cfg._b.raw))
if result:
self._start = True
self._cfg = cfg
return True
else:
return False
def stop(self) -> bool:
result = self._dll.CAN_ChannelStop(self._handle,self._channel)
if result:
self._start = False
return True
else:
return False
def __str__(self) -> str:
return str(self._cfg) if self._start else ""
##############################################################################################################
class AcUsbCan (object):
def __init__(self,dll:str,DeviceType_xx:DeviceType,usbIndex:int = 0) -> object:
self._device = DeviceType_xx.value
self._usbIndex = usbIndex
self._information = ()
self._handle = 0
try:
self._dll = ctypes.cdll.LoadLibrary(dll)
except:
raise
def open(self) -> bool:
self._handle = self._dll.CAN_DeviceOpen(self._device,self._usbIndex,ctypes.c_char_p(b""))
self.get_deviceInfo()
self.CAN0 = CanChannel(self._dll,self._handle,0)
if self._information[5] > 1:
self.CAN1 = CanChannel(self._dll,self._handle,1)
if self._handle == 0:
raise Exception("打开 usb 索引 {0} 的 {1} 设备失败!".format(self._usbIndex,DeviceType(self._device).name))
else:
return True
def close(self) -> bool:
result = self._dll.CAN_DeviceClose(self._handle)
if result:
del(self.CAN0)
if(self._information[5] > 1):
del(self.CAN1)
return True
else:
return False
def get_deviceInfo(self) -> bool:
s = struct.Struct('< 5H B 20s 40s 20s')
b = ctypes.create_string_buffer(s.size)
if self._dll.CAN_GetDeviceInfo(self._handle,b):
self._information = (s.unpack_from(b,0))
# print(self)
return True
else:
self._information = ()
return False
def __str__(self) -> str:
if self._information:
s_info = "{0:8}: {1:x}.{2:x} \r\n".format("硬件版本",self._information[0]>>8,self._information[0]&0xff)
s_info += "{0:8}: {1:x}.{2:x} \r\n".format("固件版本",self._information[1]>>8,self._information[1]&0xff)
s_info += "{0:8}: {1:x}.{2:x} \r\n".format("驱动版本",self._information[2]>>8,self._information[2]&0xff)
s_info += "{0:7}: {1:x}.{2:x} \r\n".format("接口库版本",self._information[3]>>8,self._information[3]&0xff)
s_info += "{0:9}: {1} \r\n".format("中断号",self._information[4])
s_info += "{0:9}: {1} \r\n".format("总路数",self._information[5])
s_info += "{0:7}: {1} \r\n".format("设备序列号",self._information[6].decode())
s_info += "{0:8}: {1} \r\n".format("硬件类型",self._information[7].decode())
s_info += "{0:8}: {1}".format("设备描述",self._information[8].decode())
return s_info
else:
return "<class AcUsbCan> id:{0:#x} \r\nuse 'get_deviceInfo()' for more information.".format(id(self))
##############################################################################################################
from types import DynamicClassAttribute
class SendType(enum.Enum):
Normal = 0
Once = 1
Lookback = 2
LookbackOnce = 3
@DynamicClassAttribute
def annotation(self) -> str:
_translate = {
0:"正常发送模式",
1:"单次正常发送",
2:"正常自发自收",
3:"单次自发自收",
}
return _translate[self.value]
class DataFrame(ctypes.Structure):
_fields_ = [
('_TimeFlag', ctypes.c_uint32),
('_SendType', ctypes.c_byte),
('_RemoteFlag',ctypes.c_byte),
('_ExternFlag',ctypes.c_byte),
('_DataLen',ctypes.c_int8,),
('_ID',ctypes.c_uint32),
('_arrayData',ctypes.c_byte*8)
]
def __init__(self, id:int = 0,data:bytes = b'\x00\x00\x00\x00\x00\x00\x00\x00',isExtend:bool = True,
sendType:SendType=SendType.Lookback,isRemote:bool=False,*args: Any, **kw: Any) -> None:
super().__init__(*args, **kw)
self._SendType = sendType.value
self._RemoteFlag = 1 if isRemote else 0
self._ExternFlag = 1 if isExtend else 0
self._DataLen = 8 if len(data)>8 else len(data)
self._ID = id
self._arrayData = (ctypes.c_byte*8)()
ctypes.memmove(self._arrayData,data,8 if len(data)>8 else len(data))
@property
def data(self):
return tuple([ i for i in self._arrayData])
@data.setter
def data(self, value:bytes):
_a = bytearray(8)
for i in range(8 if len(value)>8 else len(value)):
_a[i] = value[i]
self._arrayData = (ctypes.c_byte*8).from_buffer_copy(_a,0)
print(f"{len(value)=}")
self._DataLen = 8 if len(value)>8 else len(value)
@property
def dlc(self):
return self._DataLen
@property
def id(self):
return self._ID & 0x1FFFFFFFF if self._ExternFlag else self._ID & 0x3FF
@id.setter
def id(self,value:int):
self.ID = value
@property
def remote(self):
return True if self._RemoteFlag == 1 else False
@remote.setter
def remote(self,value:bool):
self._RemoteFlag = 1 if value else 0
@property
def extend(self):
return True if self._ExternFlag == 1 else False
@extend.setter
def extend(self,value:bool):
self._ExternFlag = 1 if value else 0
@property
def mode(self):
return SendType(self._SendType).annotation
@mode.setter
def mode(self,value:SendType):
self._SendType = value.value
def __str__(self) -> str:
return super().__str__()
##############################################################################################################
if platform.architecture()[0] == '32bit':
dll_filename = r"./NJLike/32bit/CanCmd.dll"
else:
dll_filename = r"./NJLike/64bit/CanCmd.dll"
usbcan = AcUsbCan(dll_filename,DeviceType.ACUSB_132B)
usbcan.open()
# usbcan.get_deviceInfo()
# print(usbcan)
init = CanInitConfig()
init.set_WorkMode(WorkMode.Listen)
init.set_Baudrate(Baudrate._125Kbps)
init.set_Filter(FilterMode.Filter_Single)
init.set_Acc(0xAA55)
init.set_Mask(0x55AA)
# usbcan.CAN0.start(init)
can0 = usbcan.CAN0
can0.start(init)
print(can0)
# import time
# time.sleep(3)
# can0.stop()
data = DataFrame()
data.ID = 0x12345
data.extend = False
print(data)
print(data.data)
print(data.dlc)
print(hex(data.id))
data.data = b'ABC'
print(data.data)
print(data.dlc)
print(hex(data.id))
print(data._fields_)
print(data._fields_[6][1][0])
print(data.mode)
data.mode = SendType.Normal
print(data.mode)
# struct.pack("< I 4B I 8B",*(1,2,3,4,5,6,1,2,3,4,5,6,7,8))
# struct.pack("< I 4B I B",data._fields_)
# struct.pack("< I 4B I 8B",*data._fields_)
print(type(data._fields_))
memoryview(data).hex()
import datetime
t = datetime.datetime.now()
print(t.strftime("%Y/%m/%d %H:%M:%S"))
t1 = datetime.datetime.fromtimestamp(0)
print(t1.strftime("%Y/%m/%d %H:%M:%S"))
# a = (1,2,3)
# b = ('a','b','c','d')
# list(zip(a,b))
a =[1,2,3,4,5]
b = list(a[:3])
b.append(a[3:])
print(a,b)
a.pop(3)
print(a)
t = b'12042504\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
print(t)
print(t.decode())
t.removesuffix(b'\x00')
# t.replace(b'\x00','')
print(t.decode().__repr__().strip('\x00'))
from USBCAN import *
import threading
from time import sleep
def receive_work(ch:Channel):
while True:
# print(f'当前未读CAN帧数:{ch.count}')
if ch.count:
_rec = ch.receive()
print(f'{threading.current_thread().name}({threading.current_thread().native_id:#010x})',
f'{ch.data_receive_time(_rec)} CAN{ch.channel} {str(_rec)}') if _rec else None
dev = AcusbCan_132B()
dev.CANO.start()
dat = CanDataFrame(0x12345678,b'abcdefgh')
for i in range(10):
print('*'*40)
if not dev.CANO.send(dat):
print(f'{dev.CANO.msg_error = }')
rx_thread = threading.Thread(target=receive_work,args=(dev.CANO,),name="CAN接收线程",daemon=True)
print(f'{threading.active_count() = }')
rx_thread.start()
# rx_thread.run()
print(f'{threading.active_count() = }')
print(f'{threading.current_thread().name = }')
for i in range(10):
print('#######################################')
if not dev.CANO.send(dat):
print(f'{dev.CANO.msg_error = }')
del rx_thread
dev.close()
print('-'*40)
import threading
import time
a = 5
def test(a):
while True:
print('我是子线程', a)
time.sleep(5)
thread = threading.Thread(target=test, args=(a, ),daemon=True)
thread.start()
for i in range(5):
print('我是主线程')
time.sleep(1)
from broadcast_service import broadcast_service
# callback of common method
def handle_msg(params):
print(f"handle_msg receive params: {params}")
# callback of decorator
@broadcast_service.on_listen(['my_topic'])
def handle_decorator_msg(params):
print(f"handle_decorator_msg receive params: {params}")
if __name__ == '__main__':
info = 'This is very important msg'
# subscribe topic
broadcast_service.subscribe('my_topic', handle_msg)
# publish broadcast
broadcast_service.publish('my_topic', info)
handle_decorator_msg receive params: This is very important msg
handle_msg receive params: This is very important msg