No formulário SlimLine Raspberry o intérprete já está pré-instalado Python, isso permite que você o use por idioma Python todas as bibliotecas disponíveis na Internet e para gerenciar os módulos de extensão que podem ser conectados ao módulo da CPU graças à biblioteca libeS8CoreMng fornecido por nós que contém definições e funções para gerenciar o hardware do sistema, bem como várias funções úteis. Para verificar a versão do interpretador instalado, execute o comando –version que retornará a versão instalada, por padrão, as versões 2 e 3 estão instaladas.
pi@raspberrypi:~ $ python --version Python 2.7.16 pi@raspberrypi:~ $ python3 --version Python 3.7.3
O programa de demonstração Ptp175 contém uma série de exemplos em linguagem Python que pode ser usado como base para o desenvolvimento de seus próprios aplicativos.
Gerenciamento de módulo de E / S de Python
O programa S8CoreMng.py inicializa o barramento de extensão ativando o watchdog (Veja o artigo) e gerencia diretamente os módulos de E / S digital e analógica conectados ao barramento de extensão. Copie o programa para uma pasta no SlimLine copiando a biblioteca na mesma pasta libeS8CoreMng.so, agora você pode executá-lo com o intérprete python (Command sudo python S8CoreMng.py). Para modificar o programa, edite-o com um editor de texto comum.
Programa "S8CoreMng.py" (Ptp175)
# ******************************************************************************
# PROGRAM "S8CoreMng.py"
# ******************************************************************************
# Bibliography
# https://support.elsist.biz/en/articoli/funzioni-inizializzazione-libreria/
# https://support.elsist.biz/en/articoli/funzioni-gestione-bus-periferico/
# https://support.elsist.biz/en/Articles/watchdog-management-functions/
# https://support.elsist.biz/en/articoli/funzioni-gestione-moduli-periferici/
# ------------------------------------------------------------------------------
# ------------------------------------------------------------------------------
# LOADING LIBRARIES
# ------------------------------------------------------------------------------
# Import others libraries
from __future__ import print_function
import time
# Loading dynamic link libraries.
# If the library is in the "/usr/lib" folder must be accessed directly,
# otherwise the full path must be defined.
from ctypes import *
eLib=cdll.LoadLibrary('./libeS8CoreMng.so')
# ------------------------------------------------------------------------------
# INITIALIZE THE LIBRARY
# ------------------------------------------------------------------------------
# Initialize the library, if an error occours the program ends.
Result=eLib.eLibInit()
print("eLibInit result:", Result)
if (Result != 0):
print("eLibInit error")
quit()
# ------------------------------------------------------------------------------
# PRINT LIBRARY VERSION
# ------------------------------------------------------------------------------
# Defined an object to execute the "C" function that returns library version,
# and "restype" casts function's return value to defined type.
GLVersion=eLib.eGetLibVersion
GLVersion.restype=c_char_p
print("Library version:", GLVersion())
# ------------------------------------------------------------------------------
# PERIPHERAL BUS INITIALIZATION
# ------------------------------------------------------------------------------
# Defines wich I2C device has been to used to manage the extension bus, and
# initializes it. Function returns 0 if Ok or the error code.
Result=eLib.ePhrI2cBusInit("/dev/i2c-4");
print("ePhrI2cBusInit result:", Result)
# Error on bus initializing there's some hardware problem.
if (Result == -1):
print("ePhrI2cBusInit error")
quit()
# System has been restarted by a watchdog intervention. "Result" returns the
# number of subsequent watchdog interventions.
if (Result == 0):
print("System power up")
if (Result != 0):
print("System has been rebooted for {} times".format(Result))
# After a defined number of system reboots by watchdog interventions, a decision
# must be taken. In this example the program is stopped.
if (Result > 2):
print("Too wdog reboot")
eLib.eResetWDog() #Watchdog reset, it reinits the reboot counter
quit()
# Set the peripheral bus ready, this activates all the attached modules. Set also
# the wdog timeout to 2 seconds. Time range (1 to 60000 mS).
# By setting time to 0, watch dog is disabled so the system is never restarted
# this is useful during the program test.
Result=False
while (Result != True):
Result=eLib.ePhrI2cBusReady(True, 0)
# Result=eLib.ePhrI2cBusReady(True, 2000)
# ------------------------------------------------------------------------------
# MANAGE THE EXTENSION MODULES
# ------------------------------------------------------------------------------
# Now it's possible to manage the extension modules.
# Check if some modules are attached to the extension bus.
PModules=eLib.eGetModulesOnPhrI2cBus()
print("On the extension bus are attached {} module(s)".format(PModules))
# Set the analog input acquisition mode "AD_VOLT_0_10_COMMON" on extension
# module 0 channel 0
if (eLib.eSetAnInpMode(0, 0, 2) == False):
print("Analog input mode set error")
# Define and initilize al variables.
DInp=c_long(0) #Digital inputs value
DIAux=c_long(0) #DI auxiliary buffer
DOut=c_long(0) #Digital ouputs value
DOAux=c_long(0) #DO auxiliary buffer
AInp=c_float(0) #Analog input value
# A forever loop is executed and in it the peripheral modules are managed.
# program quits on DInp.2 activation.
while (DInp.value&0x04 == 0):
# Sleep for a while.
time.sleep(1)
# Read digital inputs from extension module 0, mode DI_8_LL and store value
# on DInp variable. If an error occurs the program exits.
if (eLib.eGetPhrDI(0, 1, pointer(DInp)) == False):
print("Digital input acquisition error")
break
# On digital inputs change the acquired value is printed.
if (DInp.value <> DIAux.value):
DIAux.value=DInp.value #DI auxiliary buffer
print("DInp:", DInp.value)
# Copy status of DInp.0 to DOut.0.
if (DInp.value&0x01):
DOut.value=DOut.value|0x01
else:
DOut.value=DOut.value&0xFE
# Blink DOut.1
DOut.value=DOut.value^0x02 #Digital ouputs value
# Manage the digital outputs on extension module 0, mode DO_8_LL
if (eLib.eSetPhrDO(0, 1, pointer(DOut)) == False):
print("Digital output management error")
break
if (eLib.eGetAnInp(0, 0, pointer(AInp)) == False):
print("Analog input acquisition error")
print("AInp:", AInp.value)
# ------------------------------------------------------------------------------
# PROGRAM EXIT
# ------------------------------------------------------------------------------
# Before exit from the program the peripheral ready signal must be reset.
if (eLib.ePhrI2cBusReady(False, 0) == False):
print("Peripheral bus ready error")
# [End of file]
Troca de dados TCP entre programas Python e LogicLab
Vimos como isso é possível diretamente de Python gerenciar os módulos de E / S conectados ao sistema, mas se você deseja criar automações é muito mais conveniente delegar o gerenciamento do watchdog e dos módulos de E / S em tempo real a partir de um programa PLC criado com Python as operações típicas de linguagens avançadas. No exemplo "PyTCPServer"A troca de dados é gerenciada via TCP em formato JSON entre os 2 programas, o programa PLC envia os valores de dividendo e divisor para o programa Python quem responde com o resultado da divisão.
Programa "PyTCPServer.py" (Ptp175)
# ******************************************************************************
# PROGRAM "PyTCPServer.py"
# ******************************************************************************
# Bibliography
# https://docs.python.it/html/lib/socket-objects.html
# ------------------------------------------------------------------------------
# ------------------------------------------------------------------------------
# LOADING LIBRARIES
# ------------------------------------------------------------------------------
# Import others libraries
from __future__ import print_function
import time
import socket
import json
# some JSON:
x = '{ "name":"John", "age":30, "city":"New York"}'
y = json.loads(x)
print(y["age"])
# ------------------------------------------------------------------------------
# CREATE SOCKET
# ------------------------------------------------------------------------------
# Create a socket and ensure that you can restart your server quickly when it
# terminates.
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Set the client socket's TCP port number and the number of clients waiting
# for connection that can be queued.
sock.bind(('', 1000))
sock.listen(5)
print ("TCP server waiting on port: 1000")
# ------------------------------------------------------------------------------
# MANAGE SOCKET
# ------------------------------------------------------------------------------
# loop waiting for connections (terminate with Ctrl-C)
try:
while (True):
NSocket, address = sock.accept()
print ("Connected from", address)
# Loop serving the new client
while (True):
RxData = NSocket.recv(1024)
# If peer close the connection.
if not RxData:
print ("Peer has closed the connection")
break
# Decode the received data it.
JData=json.loads(RxData)
TxData=json.dumps({"Result":JData["Dividend"]/JData["Divisor"]})
print ("Received:%s -> Sent:%s"%(RxData, TxData))
# Send back the result
NSocket.send(str(TxData))
# On a "Quit" command exit from loop.
if (RxData == "Quit"): break
NSocket.close()
print ("Disconnected from", address)
# On a "Quit" command exit from program.
if (RxData == "Quit"): break
# On exit close the socket.
finally:
sock.close()
# [End of file]
Programa "ST_PyTCPServer" (Ptp175)
PROGRAM ST_PyTCPServer
VAR
i : UDINT; (* Auxiliary variable *)
Fp : eFILEP; (* File pointer *)
Dividend : REAL; (* Dividend value *)
Divisor : REAL; (* Divisor value *)
Result : REAL; (* Result value *)
CaseNr : USINT; (* Program case *)
TimeBf : UDINT; (* Time buffer (uS) *)
Request : STRING[ 128 ]; (* Request string *)
Answer : STRING[ 128 ]; (* Answer string *)
TCPClient : SysTCPClient; (* TCP client management *)
JDecode : JSONDecode_v2; (* JSON decode *)
JEncode : JSONEncode_v1; (* JSON encode *)
END_VAR
// *****************************************************************************
// PROGRAM "ST_PyTCPServer"
// *****************************************************************************
// A TCP client is instantiated, it connects to the Python server and send to it
// a JSON request and receive back a JSON answer.
// -----------------------------------------------------------------------------
// -------------------------------------------------------------------------
// INITIALIZATION
// -------------------------------------------------------------------------
// First program execution loop initializations.
IF (SysFirstLoop) THEN
TCPClient.PeerAdd:=ADR('localhost'); //Peer address
TCPClient.PeerPort:=1000; //Peer port
TCPClient.LocalAdd:=ADR('0.0.0.0'); //Local address
TCPClient.LocalPort:=0; //Local port
TCPClient.FlushTm:=50; //Flush time (mS)
TCPClient.LifeTm:=20; //Life time (S)
TCPClient.RxSize:=128; //Rx buffer size
TCPClient.TxSize:=128; //Tx buffer size
CaseNr:=0; //Program case
TimeBf:=SysGetSysTime(TRUE); //Time buffer (uS)
END_IF;
// Manage the TCP client.
TCPClient(Connect:=TRUE); //TCPClient management
// -------------------------------------------------------------------------
// PROGRAM CASES
// -------------------------------------------------------------------------
// Manage program cases.
CASE (CaseNr) OF
// ---------------------------------------------------------------------
// Waits for a while and TCP connection open.
0:
IF ((SysGetSysTime(TRUE)-TimeBf) < 1000000) THEN RETURN; END_IF;
IF NOT(SysFIsOpen(TCPClient.File)) THEN RETURN; END_IF;
TimeBf:=SysGetSysTime(TRUE); //Time buffer (uS)
// Randomize the two operators and send them to Python program.
Dividend:=1.0+SysGetRandom(TRUE)*1000.0; //Dividend value
Divisor:=1.0+SysGetRandom(TRUE)*1000.0; //Divisor value
i:=Sysmemset(ADR(Request), 0, SIZEOF(Request)); //Initialize the request string
JEncode(Object:=ADR(Request), OSize:=SIZEOF(Request), Name:=ADR('Dividend'), VType:=REAL_TYPE, VAddress:=ADR(Dividend), Count:=1);
JEncode(Object:=ADR(Request), OSize:=SIZEOF(Request), Name:=ADR('Divisor'), VType:=REAL_TYPE, VAddress:=ADR(Divisor), Count:=1);
i:=Sysfwrite(ADR(Request), TO_INT(Sysstrlen(ADR(Request))), 1, TCPClient.File);
CaseNr:=CaseNr+1; //Program case
// ---------------------------------------------------------------------
// Waits for the Python program Request. A timeout is managed.
1:
IF ((SysGetSysTime(TRUE)-TimeBf) > 1000000) THEN CaseNr:=0; RETURN; END_IF;
IF (SysFGetIChars(TCPClient.File) = 0) THEN RETURN; END_IF;
i:=Sysmemset(ADR(Answer), 0, SIZEOF(Answer)); //Initialize the answer string
i:=Sysfread(ADR(Answer), TO_INT(SIZEOF(Answer)), 1, TCPClient.File);
JDecode(Object:=ADR(Answer), Name:=ADR('Result'), VType:=REAL_TYPE, VAddress:=ADR(Result), Count:=1);
CaseNr:=0; //Program case
END_CASE;
// [End of file]