Programm SlimLine Himbeere in Python

Auf dem Formular SlimLine Raspberry Der Interpreter ist bereits vorinstalliert Python, so können Sie die Sprache verwenden Python alle im Internet verfügbaren Bibliotheken zu nutzen und die Erweiterungsmodule verwalten zu können, die dank der Bibliothek an das CPU-Modul angeschlossen werden können libeS8CoreMng von uns bereitgestellt, das Definitionen und Funktionen zur Verwaltung der Systemhardware sowie viele nützliche Funktionen enthält. Um die installierte Interpreterversion zu überprüfen, führen Sie den Befehl aus –version Dadurch wird die installierte Version zurückgegeben. Sowohl Version 2 als auch Version 3 sind standardmäßig installiert.

pi@raspberrypi:~ $ python --version
Python 2.7.16
pi@raspberrypi:~ $ python3 --version
Python 3.7.3

Das Demonstrationsprogramm Ptp175 enthält eine Reihe von Beispielen in Sprache Python die als Grundlage für die Entwicklung eigener Anwendungen dienen können.

Verwaltung von I/O-Modulen aus Python

das Programm S8CoreMng.py Initialisieren Sie den Erweiterungsbus, indem Sie den aktivieren watchdog (Siehe Artikel) und verwaltet direkt die an den Erweiterungsbus angeschlossenen digitalen und analogen E/A-Module. Kopieren Sie das Programm in einen Ordner SlimLine Kopieren Sie die Bibliothek ebenfalls in denselben Ordner libeS8CoreMng.so, können Sie es jetzt mit dem Interpreter ausführen python (Befehl sudo python S8CoreMng.py). Um das Programm zu ändern, bearbeiten Sie es mit einem gängigen Texteditor.

Programm „S8CoreMng.py“ (Ptp175)
# ******************************************************************************
# PROGRAM "S8CoreMng.py"
# ******************************************************************************
# Bibliography
# https://support.elsist.biz/en/articoli/funzioni-inizializzazione-libreria/
# https://support.elsist.biz/en/articoli/funzioni-gestione-bus-periferico/
# https://support.elsist.biz/en/Articles/watchdog-management-functions/
# https://support.elsist.biz/en/articoli/funzioni-gestione-moduli-periferici/
# ------------------------------------------------------------------------------

# ------------------------------------------------------------------------------
# LOADING LIBRARIES
# ------------------------------------------------------------------------------
# Import others libraries

from __future__ import print_function
import time

# Loading dynamic link libraries.
# If the library is in the "/usr/lib" folder must be accessed directly,
# otherwise the full path must be defined.

from ctypes import *
eLib=cdll.LoadLibrary('./libeS8CoreMng.so')

# ------------------------------------------------------------------------------
# INITIALIZE THE LIBRARY
# ------------------------------------------------------------------------------
# Initialize the library, if an error occours the program ends.

Result=eLib.eLibInit()
print("eLibInit result:", Result)

if (Result != 0):
    print("eLibInit error")
    quit()

# ------------------------------------------------------------------------------
# PRINT LIBRARY VERSION
# ------------------------------------------------------------------------------
# Defined an object to execute the "C" function that returns library version,
# and "restype" casts function's return value to defined type.

GLVersion=eLib.eGetLibVersion
GLVersion.restype=c_char_p
print("Library version:", GLVersion())

# ------------------------------------------------------------------------------
# PERIPHERAL BUS INITIALIZATION
# ------------------------------------------------------------------------------
# Defines wich I2C device has been to used to manage the extension bus, and
# initializes it. Function returns 0 if Ok or the error code.

Result=eLib.ePhrI2cBusInit("/dev/i2c-4");
print("ePhrI2cBusInit result:", Result)

# Error on bus initializing there's some hardware problem.

if (Result == -1):
    print("ePhrI2cBusInit error")
    quit()

# System has been restarted by a watchdog intervention. "Result" returns the
# number of subsequent watchdog interventions.

if (Result == 0):
    print("System power up")

if (Result != 0):
    print("System has been rebooted for {} times".format(Result))

# After a defined number of system reboots by watchdog interventions, a decision
# must be taken. In this example the program is stopped.

if (Result > 2):
    print("Too wdog reboot")
    eLib.eResetWDog() #Watchdog reset, it reinits the reboot counter
    quit()

# Set the peripheral bus ready, this activates all the attached modules. Set also
# the wdog timeout to 2 seconds. Time range (1 to 60000 mS).
# By setting time to 0, watch dog is disabled so the system is never restarted
# this is useful during the program test.

Result=False
while (Result != True):
    Result=eLib.ePhrI2cBusReady(True, 0)
#   Result=eLib.ePhrI2cBusReady(True, 2000)

# ------------------------------------------------------------------------------
# MANAGE THE EXTENSION MODULES
# ------------------------------------------------------------------------------
# Now it's possible to manage the extension modules.
# Check if some modules are attached to the extension bus.

PModules=eLib.eGetModulesOnPhrI2cBus()
print("On the extension bus are attached {} module(s)".format(PModules))

# Set the analog input acquisition mode "AD_VOLT_0_10_COMMON" on extension
# module 0 channel 0

if (eLib.eSetAnInpMode(0, 0, 2) == False):
    print("Analog input mode set error")

# Define and initilize al variables.

DInp=c_long(0) #Digital inputs value
DIAux=c_long(0) #DI auxiliary buffer
DOut=c_long(0) #Digital ouputs value
DOAux=c_long(0) #DO auxiliary buffer
AInp=c_float(0) #Analog input value

# A forever loop is executed and in it the peripheral modules are managed.
# program quits on DInp.2 activation.

while (DInp.value&0x04 == 0):

    # Sleep for a while.

    time.sleep(1)

    # Read digital inputs from extension module 0, mode DI_8_LL and store value
    # on DInp variable. If an error occurs the program exits.

    if (eLib.eGetPhrDI(0, 1, pointer(DInp)) == False):
        print("Digital input acquisition error")
        break

    # On digital inputs change the acquired value is printed.

    if (DInp.value <> DIAux.value):
        DIAux.value=DInp.value #DI auxiliary buffer
        print("DInp:", DInp.value)

    # Copy status of DInp.0 to DOut.0.

    if (DInp.value&0x01):
        DOut.value=DOut.value|0x01
    else:
        DOut.value=DOut.value&0xFE

    # Blink DOut.1

    DOut.value=DOut.value^0x02 #Digital ouputs value

    # Manage the digital outputs on extension module 0, mode DO_8_LL
     
    if (eLib.eSetPhrDO(0, 1, pointer(DOut)) == False):
        print("Digital output management error")
        break

    if (eLib.eGetAnInp(0, 0, pointer(AInp)) == False):
        print("Analog input acquisition error")

    print("AInp:", AInp.value)

# ------------------------------------------------------------------------------
# PROGRAM EXIT
# ------------------------------------------------------------------------------
# Before exit from the program the peripheral ready signal must be reset.

if (eLib.ePhrI2cBusReady(False, 0) == False):
    print("Peripheral bus ready error")

# [End of file]

TCP-Datenaustausch zwischen Python-Programmen und LogicLab

Wir haben gesehen, wie es direkt möglich ist Python Verwalten Sie die mit dem System verbundenen E/A-Module. Wenn Sie jedoch Automatisierungen erstellen möchten, ist es viel bequemer, die Verwaltung des Watchdogs und der E/A-Module in Echtzeit von einem mit LogicLab erstellten SPS-Programm aus zu delegieren, indem Sie es an delegieren Programm ein Python die typischen Operationen entwickelter Sprachen. Im Beispiel „PyTCPServerDer Datenaustausch zwischen den beiden Programmen erfolgt über TCP im JSON-Format, das SPS-Programm sendet die Dividenden- und Divisorwerte an das Programm Python der mit dem Ergebnis der Division antwortet.

Programm „PyTCPServer.py“ (Ptp175)
# ******************************************************************************
# PROGRAM "PyTCPServer.py"
# ******************************************************************************
# Bibliography
# https://docs.python.it/html/lib/socket-objects.html
# ------------------------------------------------------------------------------

# ------------------------------------------------------------------------------
# LOADING LIBRARIES
# ------------------------------------------------------------------------------
# Import others libraries

from __future__ import print_function
import time
import socket
import json

# some JSON:
x =  '{ "name":"John", "age":30, "city":"New York"}'
y = json.loads(x)
print(y["age"])

# ------------------------------------------------------------------------------
# CREATE SOCKET
# ------------------------------------------------------------------------------
# Create a socket and ensure that you can restart your server quickly when it
# terminates.

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

# Set the client socket's TCP port number and the number of clients waiting
# for connection that can be queued.

sock.bind(('', 1000))
sock.listen(5)
print ("TCP server waiting on port: 1000")

# ------------------------------------------------------------------------------
# MANAGE SOCKET
# ------------------------------------------------------------------------------
# loop waiting for connections (terminate with Ctrl-C)

try:
    while (True):
        NSocket, address = sock.accept()
        print ("Connected from", address)

        # Loop serving the new client

        while (True):
            RxData = NSocket.recv(1024)

            # If peer close the connection.
        
            if not RxData:
                print ("Peer has closed the connection")
                break

            # Decode the received data it.

            JData=json.loads(RxData)
            TxData=json.dumps({"Result":JData["Dividend"]/JData["Divisor"]})
            print ("Received:%s -> Sent:%s"%(RxData, TxData))

            # Send back the result

            NSocket.send(str(TxData))

            # On a "Quit" command exit from loop.

            if (RxData == "Quit"): break

        NSocket.close()
        print ("Disconnected from", address)

        # On a "Quit" command exit from program.

        if (RxData == "Quit"): break

# On exit close the socket.

finally:
    sock.close()

# [End of file]
Programm „ST_PyTCPServer“ (Ptp175)
PROGRAM ST_PyTCPServer
VAR
    i : UDINT; (* Auxiliary variable *)
    Fp : eFILEP; (* File pointer *)
    Dividend : REAL; (* Dividend value *)
    Divisor : REAL; (* Divisor value *)
    Result : REAL; (* Result value *)
    CaseNr : USINT; (* Program case *)
    TimeBf : UDINT; (* Time buffer (uS) *)
    Request : STRING[ 128 ];  (* Request string *)
    Answer : STRING[ 128 ]; (* Answer string *)
    TCPClient : SysTCPClient; (* TCP client management *)
    JDecode : JSONDecode_v2; (* JSON decode *)
    JEncode : JSONEncode_v1; (* JSON encode *)
END_VAR

// *****************************************************************************
// PROGRAM "ST_PyTCPServer"
// *****************************************************************************
// A TCP client is instantiated, it connects to the Python server and send to it
// a JSON request and receive back a JSON answer.
// -----------------------------------------------------------------------------

    // -------------------------------------------------------------------------
    // INITIALIZATION
    // -------------------------------------------------------------------------
    // First program execution loop initializations.

    IF (SysFirstLoop) THEN
        TCPClient.PeerAdd:=ADR('localhost'); //Peer address
        TCPClient.PeerPort:=1000; //Peer port
        TCPClient.LocalAdd:=ADR('0.0.0.0'); //Local address
        TCPClient.LocalPort:=0; //Local port
        TCPClient.FlushTm:=50; //Flush time (mS)
        TCPClient.LifeTm:=20; //Life time (S)
        TCPClient.RxSize:=128; //Rx buffer size
        TCPClient.TxSize:=128; //Tx buffer size

        CaseNr:=0; //Program case
        TimeBf:=SysGetSysTime(TRUE); //Time buffer (uS)
    END_IF;

    // Manage the TCP client.

    TCPClient(Connect:=TRUE); //TCPClient management

    // -------------------------------------------------------------------------
    // PROGRAM CASES
    // -------------------------------------------------------------------------
    // Manage program cases.

    CASE (CaseNr) OF

        // ---------------------------------------------------------------------
        // Waits for a while and TCP connection open.

        0:
        IF ((SysGetSysTime(TRUE)-TimeBf) < 1000000) THEN RETURN; END_IF;
        IF NOT(SysFIsOpen(TCPClient.File)) THEN RETURN; END_IF;
        TimeBf:=SysGetSysTime(TRUE); //Time buffer (uS)

        // Randomize the two operators and send them to Python program.

        Dividend:=1.0+SysGetRandom(TRUE)*1000.0; //Dividend value    
        Divisor:=1.0+SysGetRandom(TRUE)*1000.0; //Divisor value    
        i:=Sysmemset(ADR(Request), 0, SIZEOF(Request)); //Initialize the request string

        JEncode(Object:=ADR(Request), OSize:=SIZEOF(Request), Name:=ADR('Dividend'), VType:=REAL_TYPE, VAddress:=ADR(Dividend), Count:=1);
        JEncode(Object:=ADR(Request), OSize:=SIZEOF(Request), Name:=ADR('Divisor'), VType:=REAL_TYPE, VAddress:=ADR(Divisor), Count:=1);
        i:=Sysfwrite(ADR(Request), TO_INT(Sysstrlen(ADR(Request))), 1, TCPClient.File);
        CaseNr:=CaseNr+1; //Program case

        // ---------------------------------------------------------------------
        // Waits for the Python program Request. A timeout is managed.

        1:
        IF ((SysGetSysTime(TRUE)-TimeBf) > 1000000) THEN CaseNr:=0; RETURN; END_IF;
        IF (SysFGetIChars(TCPClient.File) = 0) THEN RETURN; END_IF;

        i:=Sysmemset(ADR(Answer), 0, SIZEOF(Answer)); //Initialize the answer string
        i:=Sysfread(ADR(Answer), TO_INT(SIZEOF(Answer)), 1, TCPClient.File);
        JDecode(Object:=ADR(Answer), Name:=ADR('Result'), VType:=REAL_TYPE, VAddress:=ADR(Result), Count:=1);
        CaseNr:=0; //Program case
    END_CASE;

// [End of file]
War dieser Artikel hilfreich?