programa SlimLine Framboesa em Python

No formulário SlimLine Raspberry o intérprete já está pré-instalado Python, isso permite que você o use por idioma Python todas as bibliotecas disponíveis na Internet e para gerenciar os módulos de extensão que podem ser conectados ao módulo da CPU graças à biblioteca libeS8CoreMng fornecido por nós que contém definições e funções para gerenciar o hardware do sistema, bem como várias funções úteis. Para verificar a versão do interpretador instalado, execute o comando –version que retornará a versão instalada, por padrão, as versões 2 e 3 estão instaladas.

pi@raspberrypi:~ $ python --version
Python 2.7.16
pi@raspberrypi:~ $ python3 --version
Python 3.7.3

O programa de demonstração Ptp175 contém uma série de exemplos em linguagem Python que pode ser usado como base para o desenvolvimento de seus próprios aplicativos.

Gerenciamento de módulo de E / S de Python

O programa S8CoreMng.py inicializa o barramento de extensão ativando o watchdog (Veja o artigo) e gerencia diretamente os módulos de E / S digital e analógica conectados ao barramento de extensão. Copie o programa para uma pasta no SlimLine copiando a biblioteca na mesma pasta libeS8CoreMng.so, agora você pode executá-lo com o intérprete python (Command sudo python S8CoreMng.py). Para modificar o programa, edite-o com um editor de texto comum.

Programa "S8CoreMng.py" (Ptp175)
# ******************************************************************************
# PROGRAM "S8CoreMng.py"
# ******************************************************************************
# Bibliography
# https://support.elsist.biz/en/articoli/funzioni-inizializzazione-libreria/
# https://support.elsist.biz/en/articoli/funzioni-gestione-bus-periferico/
# https://support.elsist.biz/en/Articles/watchdog-management-functions/
# https://support.elsist.biz/en/articoli/funzioni-gestione-moduli-periferici/
# ------------------------------------------------------------------------------

# ------------------------------------------------------------------------------
# LOADING LIBRARIES
# ------------------------------------------------------------------------------
# Import others libraries

from __future__ import print_function
import time

# Loading dynamic link libraries.
# If the library is in the "/usr/lib" folder must be accessed directly,
# otherwise the full path must be defined.

from ctypes import *
eLib=cdll.LoadLibrary('./libeS8CoreMng.so')

# ------------------------------------------------------------------------------
# INITIALIZE THE LIBRARY
# ------------------------------------------------------------------------------
# Initialize the library, if an error occours the program ends.

Result=eLib.eLibInit()
print("eLibInit result:", Result)

if (Result != 0):
    print("eLibInit error")
    quit()

# ------------------------------------------------------------------------------
# PRINT LIBRARY VERSION
# ------------------------------------------------------------------------------
# Defined an object to execute the "C" function that returns library version,
# and "restype" casts function's return value to defined type.

GLVersion=eLib.eGetLibVersion
GLVersion.restype=c_char_p
print("Library version:", GLVersion())

# ------------------------------------------------------------------------------
# PERIPHERAL BUS INITIALIZATION
# ------------------------------------------------------------------------------
# Defines wich I2C device has been to used to manage the extension bus, and
# initializes it. Function returns 0 if Ok or the error code.

Result=eLib.ePhrI2cBusInit("/dev/i2c-4");
print("ePhrI2cBusInit result:", Result)

# Error on bus initializing there's some hardware problem.

if (Result == -1):
    print("ePhrI2cBusInit error")
    quit()

# System has been restarted by a watchdog intervention. "Result" returns the
# number of subsequent watchdog interventions.

if (Result == 0):
    print("System power up")

if (Result != 0):
    print("System has been rebooted for {} times".format(Result))

# After a defined number of system reboots by watchdog interventions, a decision
# must be taken. In this example the program is stopped.

if (Result > 2):
    print("Too wdog reboot")
    eLib.eResetWDog() #Watchdog reset, it reinits the reboot counter
    quit()

# Set the peripheral bus ready, this activates all the attached modules. Set also
# the wdog timeout to 2 seconds. Time range (1 to 60000 mS).
# By setting time to 0, watch dog is disabled so the system is never restarted
# this is useful during the program test.

Result=False
while (Result != True):
    Result=eLib.ePhrI2cBusReady(True, 0)
#   Result=eLib.ePhrI2cBusReady(True, 2000)

# ------------------------------------------------------------------------------
# MANAGE THE EXTENSION MODULES
# ------------------------------------------------------------------------------
# Now it's possible to manage the extension modules.
# Check if some modules are attached to the extension bus.

PModules=eLib.eGetModulesOnPhrI2cBus()
print("On the extension bus are attached {} module(s)".format(PModules))

# Set the analog input acquisition mode "AD_VOLT_0_10_COMMON" on extension
# module 0 channel 0

if (eLib.eSetAnInpMode(0, 0, 2) == False):
    print("Analog input mode set error")

# Define and initilize al variables.

DInp=c_long(0) #Digital inputs value
DIAux=c_long(0) #DI auxiliary buffer
DOut=c_long(0) #Digital ouputs value
DOAux=c_long(0) #DO auxiliary buffer
AInp=c_float(0) #Analog input value

# A forever loop is executed and in it the peripheral modules are managed.
# program quits on DInp.2 activation.

while (DInp.value&0x04 == 0):

    # Sleep for a while.

    time.sleep(1)

    # Read digital inputs from extension module 0, mode DI_8_LL and store value
    # on DInp variable. If an error occurs the program exits.

    if (eLib.eGetPhrDI(0, 1, pointer(DInp)) == False):
        print("Digital input acquisition error")
        break

    # On digital inputs change the acquired value is printed.

    if (DInp.value <> DIAux.value):
        DIAux.value=DInp.value #DI auxiliary buffer
        print("DInp:", DInp.value)

    # Copy status of DInp.0 to DOut.0.

    if (DInp.value&0x01):
        DOut.value=DOut.value|0x01
    else:
        DOut.value=DOut.value&0xFE

    # Blink DOut.1

    DOut.value=DOut.value^0x02 #Digital ouputs value

    # Manage the digital outputs on extension module 0, mode DO_8_LL
     
    if (eLib.eSetPhrDO(0, 1, pointer(DOut)) == False):
        print("Digital output management error")
        break

    if (eLib.eGetAnInp(0, 0, pointer(AInp)) == False):
        print("Analog input acquisition error")

    print("AInp:", AInp.value)

# ------------------------------------------------------------------------------
# PROGRAM EXIT
# ------------------------------------------------------------------------------
# Before exit from the program the peripheral ready signal must be reset.

if (eLib.ePhrI2cBusReady(False, 0) == False):
    print("Peripheral bus ready error")

# [End of file]

Troca de dados TCP entre programas Python e LogicLab

Vimos como isso é possível diretamente de Python gerenciar os módulos de E / S conectados ao sistema, mas se você deseja criar automações é muito mais conveniente delegar o gerenciamento do watchdog e dos módulos de E / S em tempo real a partir de um programa PLC criado com Python as operações típicas de linguagens avançadas. No exemplo "PyTCPServer"A troca de dados é gerenciada via TCP em formato JSON entre os 2 programas, o programa PLC envia os valores de dividendo e divisor para o programa Python quem responde com o resultado da divisão.

Programa "PyTCPServer.py" (Ptp175)
# ******************************************************************************
# PROGRAM "PyTCPServer.py"
# ******************************************************************************
# Bibliography
# https://docs.python.it/html/lib/socket-objects.html
# ------------------------------------------------------------------------------

# ------------------------------------------------------------------------------
# LOADING LIBRARIES
# ------------------------------------------------------------------------------
# Import others libraries

from __future__ import print_function
import time
import socket
import json

# some JSON:
x =  '{ "name":"John", "age":30, "city":"New York"}'
y = json.loads(x)
print(y["age"])

# ------------------------------------------------------------------------------
# CREATE SOCKET
# ------------------------------------------------------------------------------
# Create a socket and ensure that you can restart your server quickly when it
# terminates.

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

# Set the client socket's TCP port number and the number of clients waiting
# for connection that can be queued.

sock.bind(('', 1000))
sock.listen(5)
print ("TCP server waiting on port: 1000")

# ------------------------------------------------------------------------------
# MANAGE SOCKET
# ------------------------------------------------------------------------------
# loop waiting for connections (terminate with Ctrl-C)

try:
    while (True):
        NSocket, address = sock.accept()
        print ("Connected from", address)

        # Loop serving the new client

        while (True):
            RxData = NSocket.recv(1024)

            # If peer close the connection.
        
            if not RxData:
                print ("Peer has closed the connection")
                break

            # Decode the received data it.

            JData=json.loads(RxData)
            TxData=json.dumps({"Result":JData["Dividend"]/JData["Divisor"]})
            print ("Received:%s -> Sent:%s"%(RxData, TxData))

            # Send back the result

            NSocket.send(str(TxData))

            # On a "Quit" command exit from loop.

            if (RxData == "Quit"): break

        NSocket.close()
        print ("Disconnected from", address)

        # On a "Quit" command exit from program.

        if (RxData == "Quit"): break

# On exit close the socket.

finally:
    sock.close()

# [End of file]
Programa "ST_PyTCPServer" (Ptp175)
PROGRAM ST_PyTCPServer
VAR
    i : UDINT; (* Auxiliary variable *)
    Fp : eFILEP; (* File pointer *)
    Dividend : REAL; (* Dividend value *)
    Divisor : REAL; (* Divisor value *)
    Result : REAL; (* Result value *)
    CaseNr : USINT; (* Program case *)
    TimeBf : UDINT; (* Time buffer (uS) *)
    Request : STRING[ 128 ];  (* Request string *)
    Answer : STRING[ 128 ]; (* Answer string *)
    TCPClient : SysTCPClient; (* TCP client management *)
    JDecode : JSONDecode_v2; (* JSON decode *)
    JEncode : JSONEncode_v1; (* JSON encode *)
END_VAR

// *****************************************************************************
// PROGRAM "ST_PyTCPServer"
// *****************************************************************************
// A TCP client is instantiated, it connects to the Python server and send to it
// a JSON request and receive back a JSON answer.
// -----------------------------------------------------------------------------

    // -------------------------------------------------------------------------
    // INITIALIZATION
    // -------------------------------------------------------------------------
    // First program execution loop initializations.

    IF (SysFirstLoop) THEN
        TCPClient.PeerAdd:=ADR('localhost'); //Peer address
        TCPClient.PeerPort:=1000; //Peer port
        TCPClient.LocalAdd:=ADR('0.0.0.0'); //Local address
        TCPClient.LocalPort:=0; //Local port
        TCPClient.FlushTm:=50; //Flush time (mS)
        TCPClient.LifeTm:=20; //Life time (S)
        TCPClient.RxSize:=128; //Rx buffer size
        TCPClient.TxSize:=128; //Tx buffer size

        CaseNr:=0; //Program case
        TimeBf:=SysGetSysTime(TRUE); //Time buffer (uS)
    END_IF;

    // Manage the TCP client.

    TCPClient(Connect:=TRUE); //TCPClient management

    // -------------------------------------------------------------------------
    // PROGRAM CASES
    // -------------------------------------------------------------------------
    // Manage program cases.

    CASE (CaseNr) OF

        // ---------------------------------------------------------------------
        // Waits for a while and TCP connection open.

        0:
        IF ((SysGetSysTime(TRUE)-TimeBf) < 1000000) THEN RETURN; END_IF;
        IF NOT(SysFIsOpen(TCPClient.File)) THEN RETURN; END_IF;
        TimeBf:=SysGetSysTime(TRUE); //Time buffer (uS)

        // Randomize the two operators and send them to Python program.

        Dividend:=1.0+SysGetRandom(TRUE)*1000.0; //Dividend value    
        Divisor:=1.0+SysGetRandom(TRUE)*1000.0; //Divisor value    
        i:=Sysmemset(ADR(Request), 0, SIZEOF(Request)); //Initialize the request string

        JEncode(Object:=ADR(Request), OSize:=SIZEOF(Request), Name:=ADR('Dividend'), VType:=REAL_TYPE, VAddress:=ADR(Dividend), Count:=1);
        JEncode(Object:=ADR(Request), OSize:=SIZEOF(Request), Name:=ADR('Divisor'), VType:=REAL_TYPE, VAddress:=ADR(Divisor), Count:=1);
        i:=Sysfwrite(ADR(Request), TO_INT(Sysstrlen(ADR(Request))), 1, TCPClient.File);
        CaseNr:=CaseNr+1; //Program case

        // ---------------------------------------------------------------------
        // Waits for the Python program Request. A timeout is managed.

        1:
        IF ((SysGetSysTime(TRUE)-TimeBf) > 1000000) THEN CaseNr:=0; RETURN; END_IF;
        IF (SysFGetIChars(TCPClient.File) = 0) THEN RETURN; END_IF;

        i:=Sysmemset(ADR(Answer), 0, SIZEOF(Answer)); //Initialize the answer string
        i:=Sysfread(ADR(Answer), TO_INT(SIZEOF(Answer)), 1, TCPClient.File);
        JDecode(Object:=ADR(Answer), Name:=ADR('Result'), VType:=REAL_TYPE, VAddress:=ADR(Result), Count:=1);
        CaseNr:=0; //Program case
    END_CASE;

// [End of file]
Esse artigo foi útil?