本文共 24533 字,大约阅读时间需要 81 分钟。
python传递数据
当我被要求做一项使我的脊椎发抖的任务时,我曾有一段时间担任软件工程师。 这样的时刻是我不得不在一些需要C的新硬件基础设施和主要是Python的云基础设施之间编写接口。
一种策略可能是 ,Python设计支持 。 快速浏览文档说明这将意味着编写大量的C。在某些情况下这可能会很好,但是我不喜欢这样做。 另一种策略是将这两个任务放在单独的进程中,并使用在两者之间交换消息。
当我在发现ZeroMQ之前经历了这种情况时,我经历了扩展编写路径。 并没有那么糟糕,但是却非常耗时且令人费解。 如今,为了避免这种情况,我将系统细分为独立的进程,这些进程通过通过发送的消息交换信息。 通过这种方法,几种编程语言可以共存,并且每个过程都更简单,因此更容易调试。
ZeroMQ提供了更简单的过程:
ZeroMQ项目的创始人之一是 ,他是一位杰出的人物,有着 。
对于本教程,您将需要:
使用以下命令在Fedora上安装它们:
$ dnf install clang zeromq zeromq-devel python3 python3-zmq
对于Debian或Ubuntu:
$ apt-get install clang libzmq5 libzmq3-dev python3 python3-zmq
如果遇到任何问题,请参考每个项目的安装说明(在上面链接)。
由于这是一个假设的场景,因此本教程将编写一个具有两个功能的虚拟库:
将库的完整源代码保存到名为libfancyhw.h的文件中:
#ifndef LIBFANCYHW_H #define LIBFANCYHW_H #include#include // This is the fictitious hardware interfacing library void fancyhw_init ( unsigned int init_param ) { ( init_param ) ; } int16_t fancyhw_read_val ( void ) { return ( int16_t ) ( ) ; } #endif
借助随机数生成器,该库可以模拟您希望在语言之间传递的数据。
下面将逐步编写C接口-从包括库到管理数据传输。
首先加载必要的库(每个库的目的在代码的注释中):
// For printf() #include// For EXIT_* #include // For memcpy() #include // For sleep() #include #include #include "libfancyhw.h"
定义程序其余部分所需的主要功能和重要参数:
int main ( void ) { const unsigned int INIT_PARAM = 12345 ; const unsigned int REPETITIONS = 10 ; const unsigned int PACKET_SIZE = 16 ; const char * TOPIC = "fancyhw_data" ; ...
这两个库都需要进行一些初始化。 虚拟的只需要一个参数:
fancyhw_init ( INIT_PARAM ) ;
ZeroMQ库需要一些实际的初始化。 首先,定义一个上下文 —一个管理所有套接字的对象:
void * context = zmq_ctx_new ( ) ; if ( ! context ) { ( "ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s \n " , zmq_strerror ( errno ) ) ; return EXIT_FAILURE ; }
然后定义用于传递数据的套接字。 ZeroMQ支持几种类型的套接字,每种都有其应用程序。 使用发布套接字(也称为PUB套接字),该套接字可以将消息的副本传递给多个接收者。 这种方法使您可以附加几个将都收到相同消息的接收者。 如果没有接收者,则消息将被丢弃(即,它们将不会排队)。 为此,请执行以下操作:
void * data_socket = zmq_socket ( context , ZMQ_PUB ) ;
套接字必须绑定到一个地址,以便客户端知道要连接的位置。 在这种情况下,请使用 (还有 ,但是TCP是一个很好的默认选择):
const int rb = zmq_bind ( data_socket , "tcp://*:5555" ) ; if ( rb != 0 ) { ( "ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s \n " , zmq_strerror ( errno ) ) ; return EXIT_FAILURE ; }
接下来,计算一些以后将需要的有用值。 注意下面代码中的TOPIC ; PUB套接字需要一个主题与其发送的消息相关联。 接收者可以使用主题来过滤消息:
const size_t topic_size = ( TOPIC ) ; const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof ( int16_t ) ; ( "Topic: %s; topic size: %zu; Envelope size: %zu \n " , TOPIC , topic_size , envelope_size ) ;
启动一个循环,发送REPETITIONS消息:
for ( unsigned int i = 0 ; i < REPETITIONS ; i ++ ) { ...
发送消息之前,请填充PACKET_SIZE值的缓冲区。 该库提供16位有符号整数。 由于未定义C中int的维,因此请使用具有特定宽度的int :
int16_t buffer [ PACKET_SIZE ] ; for ( unsigned int j = 0 ; j < PACKET_SIZE ; j ++ ) { buffer [ j ] = fancyhw_read_val ( ) ; } ( "Read %u data values \n " , PACKET_SIZE ) ;
消息准备和传递的第一步是创建ZeroMQ消息并分配消息所需的内存。 此空消息是用于存储您将要发送的数据的信封:
zmq_msg_t envelope ; const int rmi = zmq_msg_init_size ( & envelope , envelope_size ) ; if ( rmi != 0 ) { ( "ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %s \n " , zmq_strerror ( errno ) ) ; zmq_msg_close ( & envelope ) ; break ; }
现在已经分配了内存,将数据存储在ZeroMQ消息“信封”中。 zmq_msg_data()函数返回一个指针,该指针指向信封中缓冲区的开头。 第一部分是主题,其后是空格,然后是二进制数据。 在主题和数据之间添加空格作为分隔符。 要沿着缓冲区移动,必须使用强制转换和 。 (感谢C,让事情变得简单明了。)执行以下操作:
( zmq_msg_data ( & envelope ) , TOPIC , topic_size ) ; ( ( void * ) ( ( char * ) zmq_msg_data ( & envelope ) + topic_size ) , " " , 1 ) ; ( ( void * ) ( ( char * ) zmq_msg_data ( & envelope ) + 1 + topic_size ) , buffer , PACKET_SIZE * sizeof ( int16_t ) ) ;
通过data_socket发送消息:
const size_t rs = zmq_msg_send ( & envelope , data_socket , 0 ) ; if ( rs != envelope_size ) { ( "ERROR: ZeroMQ error occurred during zmq_msg_send(): %s \n " , zmq_strerror ( errno ) ) ; zmq_msg_close ( & envelope ) ; break ; }
使用信封后,请确保将其丢弃:
zmq_msg_close ( & envelope ) ; ( "Message sent; i: %u, topic: %s \n " , i , TOPIC ) ;
因为C不提供 ,所以您必须整理一下。 发送完消息后,使用释放释放的内存所需的清理程序关闭程序:
const int rc = zmq_close ( data_socket ) ; if ( rc != 0 ) { ( "ERROR: ZeroMQ error occurred during zmq_close(): %s \n " , zmq_strerror ( errno ) ) ; return EXIT_FAILURE ; } const int rd = zmq_ctx_destroy ( context ) ; if ( rd != 0 ) { ( "Error occurred during zmq_ctx_destroy(): %s \n " , zmq_strerror ( errno ) ) ; return EXIT_FAILURE ; } return EXIT_SUCCESS ;
将下面的完整接口库保存到名为hw_interface.c的本地文件中:
// For printf() #include// For EXIT_* #include // For memcpy() #include // For sleep() #include #include #include "libfancyhw.h" int main ( void ) { const unsigned int INIT_PARAM = 12345 ; const unsigned int REPETITIONS = 10 ; const unsigned int PACKET_SIZE = 16 ; const char * TOPIC = "fancyhw_data" ; fancyhw_init ( INIT_PARAM ) ; void * context = zmq_ctx_new ( ) ; if ( ! context ) { ( "ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s \n " , zmq_strerror ( errno ) ) ; return EXIT_FAILURE ; } void * data_socket = zmq_socket ( context , ZMQ_PUB ) ; const int rb = zmq_bind ( data_socket , "tcp://*:5555" ) ; if ( rb != 0 ) { ( "ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s \n " , zmq_strerror ( errno ) ) ; return EXIT_FAILURE ; } const size_t topic_size = ( TOPIC ) ; const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof ( int16_t ) ; ( "Topic: %s; topic size: %zu; Envelope size: %zu \n " , TOPIC , topic_size , envelope_size ) ; for ( unsigned int i = 0 ; i < REPETITIONS ; i ++ ) { int16_t buffer [ PACKET_SIZE ] ; for ( unsigned int j = 0 ; j < PACKET_SIZE ; j ++ ) { buffer [ j ] = fancyhw_read_val ( ) ; } ( "Read %u data values \n " , PACKET_SIZE ) ; zmq_msg_t envelope ; const int rmi = zmq_msg_init_size ( & envelope , envelope_size ) ; if ( rmi != 0 ) { ( "ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %s \n " , zmq_strerror ( errno ) ) ; zmq_msg_close ( & envelope ) ; break ; } ( zmq_msg_data ( & envelope ) , TOPIC , topic_size ) ; ( ( void * ) ( ( char * ) zmq_msg_data ( & envelope ) + topic_size ) , " " , 1 ) ; ( ( void * ) ( ( char * ) zmq_msg_data ( & envelope ) + 1 + topic_size ) , buffer , PACKET_SIZE * sizeof ( int16_t ) ) ; const size_t rs = zmq_msg_send ( & envelope , data_socket , 0 ) ; if ( rs != envelope_size ) { ( "ERROR: ZeroMQ error occurred during zmq_msg_send(): %s \n " , zmq_strerror ( errno ) ) ; zmq_msg_close ( & envelope ) ; break ; } zmq_msg_close ( & envelope ) ; ( "Message sent; i: %u, topic: %s \n " , i , TOPIC ) ; sleep ( 1 ) ; } const int rc = zmq_close ( data_socket ) ; if ( rc != 0 ) { ( "ERROR: ZeroMQ error occurred during zmq_close(): %s \n " , zmq_strerror ( errno ) ) ; return EXIT_FAILURE ; } const int rd = zmq_ctx_destroy ( context ) ; if ( rd != 0 ) { ( "Error occurred during zmq_ctx_destroy(): %s \n " , zmq_strerror ( errno ) ) ; return EXIT_FAILURE ; } return EXIT_SUCCESS ; }
使用以下命令进行编译:
$ clang -std =c99 -I. hw_interface.c -lzmq -o hw_interface
如果没有编译错误,则可以运行该界面。 很棒的是ZeroMQ PUB套接字可以在没有任何应用程序发送或检索数据的情况下运行。 这降低了复杂性,因为没有义务要求首先开始哪个过程。
运行界面:
$ . / hw_interface Topic: fancyhw_data; topic size: 12 ; Envelope size: 45 Read 16 data values Message sent; i: 0 , topic: fancyhw_data Read 16 data values Message sent; i: 1 , topic: fancyhw_data Read 16 data values ... ...
输出显示通过ZeroMQ发送的数据。 现在,您需要一个应用程序来读取数据。
现在,您可以将数据从C传递到Python应用程序了。
您需要两个库来帮助传输数据。 首先,您需要Python中的ZeroMQ绑定:
$ python3 -m pip install zmq
另一个是 ,它解码二进制数据。 它在Python标准库中通常可用,因此无需pip安装它。
Python程序的第一部分导入了这两个库:
import zmq import struct
要使用ZeroMQ,您必须订阅上述常量TOPIC中使用的相同主题:
topic = "fancyhw_data" . encode ( 'ascii' ) print ( "Reading messages with topic: {}" . format ( topic ) )
接下来,初始化上下文和套接字。 使用订阅套接字(也称为SUB套接字),它是PUB套接字的自然伙伴。 套接字还需要订阅正确的主题:
with zmq. Context ( ) as context: socket = context. socket ( zmq. SUB ) socket . connect ( "tcp://127.0.0.1:5555" ) socket . setsockopt ( zmq. SUBSCRIBE , topic ) i = 0 ...
启动一个无限循环,等待新消息传递到SUB套接字。 如果按Ctrl + C或发生错误,则循环将关闭:
try : while True : ... # we will fill this in next except KeyboardInterrupt : socket . close ( ) except Exception as error: print ( "ERROR: {}" . format ( error ) ) socket . close ( )
循环等待新消息通过recv()方法到达。 然后,它拆分在第一个空格处接收到的所有内容,以将主题与内容分开:
binary_topic , data_buffer = socket . recv ( ) . split ( b ' ' , 1 )
Python尚不知道该主题是字符串,因此请使用标准ASCII编码对其进行解码:
topic = binary_topic. decode ( encoding = 'ascii' ) print ( "Message {:d}:" . format ( i ) ) print ( " \t topic: '{}'" . format ( topic ) )
下一步是使用struct库读取二进制数据,该库可以将无形状的二进制blob转换为有效值。 首先,计算存储在数据包中的值的数量。 本示例使用16位带符号整数,它们与struct 的“ h”相对应:
packet_size = len ( data_buffer ) // struct . calcsize ( "h" ) print ( " \t packet size: {:d}" . format ( packet_size ) )
通过知道数据包中有多少个值,您可以通过准备一个带有值的数量及其类型(例如,“ 16h ”)的字符串来定义格式:
struct_format = "{:d}h" . format ( packet_size )
将该二进制blob转换为可以立即打印的一系列数字:
data = struct . unpack ( struct_format , data_buffer ) print ( " \t data: {}" . format ( data ) )
这是Python中完整的数据接收器:
#! /usr/bin/env python3 import zmq import struct topic = "fancyhw_data" . encode ( 'ascii' ) print ( "Reading messages with topic: {}" . format ( topic ) ) with zmq. Context ( ) as context: socket = context. socket ( zmq. SUB ) socket . connect ( "tcp://127.0.0.1:5555" ) socket . setsockopt ( zmq. SUBSCRIBE , topic ) i = 0 try : while True : binary_topic , data_buffer = socket . recv ( ) . split ( b ' ' , 1 ) topic = binary_topic. decode ( encoding = 'ascii' ) print ( "Message {:d}:" . format ( i ) ) print ( " \t topic: '{}'" . format ( topic ) ) packet_size = len ( data_buffer ) // struct . calcsize ( "h" ) print ( " \t packet size: {:d}" . format ( packet_size ) ) struct_format = "{:d}h" . format ( packet_size ) data = struct . unpack ( struct_format , data_buffer ) print ( " \t data: {}" . format ( data ) ) i + = 1 except KeyboardInterrupt : socket . close ( ) except Exception as error: print ( "ERROR: {}" . format ( error ) ) socket . close ( )
将其保存到名为online_analysis.py的文件中。 不需要编译Python,因此您可以立即运行该程序。
这是输出:
$ . / online_analysis.py Reading messages with topic: b 'fancyhw_data' Message 0 : topic: 'fancyhw_data' packet size: 16 data: ( 20946 , - 23616 , 9865 , 31416 , - 15911 , - 10845 , - 5332 , 25662 , 10955 , - 32501 , - 18717 , - 24490 , - 16511 , - 28861 , 24205 , 26568 ) Message 1 : topic: 'fancyhw_data' packet size: 16 data: ( 12505 , 31355 , 14083 , - 19654 , - 9141 , 14532 , - 25591 , 31203 , 10428 , - 25564 , - 732 , - 7979 , 9529 , - 27982 , 29610 , 30475 ) ... ...
本教程还增加了我所谓的“软件粒度”。 换句话说,它将软件细分为较小的单元。 这种策略的好处之一是可以同时使用不同的编程语言,而最少的接口充当了它们之间的垫片。
实际上,这种设计允许软件工程师更加协作和独立地工作。 不同的团队可能会在分析的不同步骤上工作,选择他们喜欢的工具。 另一个好处是免费提供了并行性,因为所有进程都可以并行运行。 是一款了不起的软件,它使所有这些工作变得更加容易。
翻译自:
python传递数据
转载地址:http://wiszd.baihongyu.com/