Code files of Python nodes¶
Python nodes use the Python executable installed on the system. They connect to Node-BLUE using Unix domain sockets (or IPC sockets). Like this, Node-BLUE has full Python support including all Python modules.
Homegear object¶
To connect to Node-BLUE we need to import the Homegear module and create a Homegear object:
from homegear import Homegear
import sys
socketPath = sys.argv[1]
nodeId = sys.argv[2]
# hg waits until the connection is established (but for a maximum of 2 seonds).
# The socket path is passed in sys.argv[1], the node's ID in sys.argv[2]
hg = Homegear(socketPath, eventHandler, nodeId, nodeInput)
As IPC sockets are used, we need to know the path to the socket. We also need to know the node's ID. Node-BLUE passes the socket path in sys.argv[1]
and the node's ID in sys.argv[2]
.
In addition we need two callback methods: eventHandler
and nodeInput
. The first one is called on any variable update within Homegear (see the Homegear reference). eventHandler
also is called when Node-BLUE is fully started. nodeInput
is called, when a new message arrives. Our example now looks like this:
from homegear import Homegear
import sys
socketPath = sys.argv[1]
nodeId = sys.argv[2]
def eventHandler(eventSource, peerId, channel, variableName, value):
pass
def nodeInput(nodeInfo, inputIndex, msg):
pass
# hg waits until the connection is established (but for a maximum of 2 seonds).
# The socket path is passed in sys.argv[1], the node's ID in sys.argv[2]
hg = Homegear(socketPath, eventHandler, nodeId, nodeInput)
Now we just need to make sure, the Python process keeps running. The recommended way to do so is to check if the Homegear object is still connected to Homegear. Our full Python node template then looks like this:
Python node template¶
from homegear import Homegear
import sys
import time
socketPath = sys.argv[1]
nodeId = sys.argv[2]
# This callback method is called on Homegear variable changes.
def eventHandler(eventSource, peerId, channel, variableName, value):
pass
# This callback method is called when a message arrives on one of the node's inputs.
def nodeInput(nodeInfo, inputIndex, msg):
pass
# hg waits until the connection is established (but for a maximum of 2 seonds).
# The socket path is passed in sys.argv[1], the node's ID in sys.argv[2]
hg = Homegear(socketPath, eventHandler, nodeId, nodeInput)
while hg.connected():
time.sleep(1)
Note
See Using asyncio for thread synchronization for a template with thread synchronization.
Start and stop¶
Every individual Python node is executed in it's own Python process. The node is started in the start
phase of the initialization process and stopped in the stop
phase of the deinitialization process. init
is not available. After the signal 15 is received, the Python process should exit within 30 seconds.
Wait for Node-BLUE start to complete¶
In many cases it is required to know when Node-BLUE start up is complete. For this Node-BLUE sends an event to the Python process and eventHandler
is called with eventSource
set to nodeBlue
, peerId
set to 0
, variableName
set to startUpComplete
and value
set to the node info structure. The challenge now is to notify the main thread about this event as eventHandler
runs in a different thread. For this thread synchronization techniques must be used. In our example we use a condition variable:
from homegear import Homegear
import threading
import sys
import time
hg = None
nodeInfo = None
socketPath = sys.argv[1]
nodeId = sys.argv[2]
startUpComplete = threading.Condition()
# This callback method is called on Homegear variable changes.
def eventHandler(eventSource, peerId, channel, variableName, value):
# When the flows are fully started, "startUpComplete" is set to "true". Wait for this event.
if eventSource == "nodeBlue" and peerId == 0 and variableName == "startUpComplete":
startUpComplete.acquire()
global nodeInfo
nodeInfo = value
startUpComplete.notify()
startUpComplete.release()
# This callback method is called when a message arrives on one of the node's inputs.
def nodeInput(nodeInfo, inputIndex, msg):
pass
# hg waits until the connection is established (but for a maximum of 2 seonds).
# The socket path is passed in sys.argv[1], the node's ID in sys.argv[2]
hg = Homegear(socketPath, eventHandler, nodeId, nodeInput)
# Wait for the flows to start.
startUpComplete.acquire()
while hg.connected():
if startUpComplete.wait(1) == True:
break
startUpComplete.release()
# The node is now fully started.
# The script needs to run permanently. Just stop it, when Homegear is not running anymore.
# Homegear sends a signal 15 (SIGTERM) to the process to stop it.
while hg.connected():
time.sleep(1)
Receiving messages¶
Whenever a new message arrives nodeInput
is executed:
def nodeInput(nodeInfo, inputIndex, msg):
# Do something with 'msg'
pass
Warning
nodeInput
is executed in it's own thread. To communicate with the main thread, thread synchronization is required.
Multiple inputs¶
If the node has more than one input, the input the message arrived at can be read from inputIndex
.
Thread synchronization¶
We recommend to use asyncio
. See the section about asyncio.
Handling errors and logging events¶
If the node encounters an error whilst handling the message or if it wants to write something to the log file, it should call the nodeLog
method:
logLevel = 2
message = "Something really bad happened"
hg.nodeLog(logLevel, message)
Available log levels are:
Log level | Description |
---|---|
1 | Critical |
2 | Error |
3 | Warning |
4 | Info |
5 | Debug |
The log message is written to Homegear's flows log file. Warnings and errors also trigger Catch
nodes and - if not handled by a Catch
node - are written to debug tabs of connected frontends.
Calling Homegear RPC methods¶
For an overview of the available RPC methods, visit Homegear's RPC reference.
All RPC methods are callable on the Homegear object, i. e.:
devices = hg.listDevices()
Sending messages¶
To send a message, call the method nodeOutput
on the Homegear object:
msg = {"payload": "A message from Python."}
hg.nodeOutput(0, msg)
The first parameter is the index of the output, the second parameter is the message to send. Note that msg
must be an Array
and requires at least the entry payload
.
If the node is sending a message in response to having received one, it should reuse the received message rather than create a new message object. This ensures existing properties on the message are preserved for the rest of the flow.
Setting status¶
Whilst running, a node is able to share status information with the editor UI. This is done by calling the nodeEvent
function:
hg.nodeEvent('status/' + nodeId, {"text": "Feeling great", "shape": "dot", "fill": "green"})
Using asyncio for thread synchronization¶
asyncio
probably is the easiest way to synchronize threads. It's also the recommended way to implement asynchronous tasks in Python so it is used by many libraries.
Here's a full example where you don't have to worry about thread synchronization anymore:
from homegear import Homegear
import threading
import sys
import asyncio
socketPath = sys.argv[1]
nodeId = sys.argv[2]
loop = asyncio.get_event_loop()
nodeInfo = None
hg = None
startUpComplete = threading.Condition()
# This callback method is called on Homegear variable changes.
def eventHandler(eventSource, peerId, channel, variableName, value):
# When the flows are fully started, "startUpComplete" is set to "true". Wait for this event.
if eventSource == "nodeBlue" and peerId == 0 and variableName == "startUpComplete":
startUpComplete.acquire()
global nodeInfo
nodeInfo = value
startUpComplete.notify()
startUpComplete.release()
# Just call eventHandlerThreadSafe and do everything else there
asyncio.run_coroutine_threadsafe(eventHandlerThreadSafe(eventSource, peerId, channel, variableName, value), loop).result()
async def eventHandlerThreadSafe(eventSource, peerId, channel, variableName, value):
# Here we don't need to worry about thread synchronization anymore
# Called for every subscribed Homegear variable event
pass
# This callback method is called when a message arrives on one of the node's inputs.
def nodeInput(nodeInfo, inputIndex, message):
# Just call nodeInputThreadSafe and do everything else there
asyncio.run_coroutine_threadsafe(nodeInputThreadSafe(nodeInfo, inputIndex, message), loop).result()
async def nodeInputThreadSafe(nodeInfo, inputIndex, message):
# Here we don't need to worry about thread synchronization anymore
# Called for every new arriving message
pass
async def appstart(loop, hg):
while hg.connected():
# Here is your main loop. We are just sleeping in this example.
await asyncio.sleep(1)
return 0
# hg waits until the connection is established (but for a maximum of 2 seonds).
# The socket path is passed in sys.argv[1], the node's ID in sys.argv[2]
hg = Homegear(socketPath, eventHandler, nodeId], nodeInput);
# Wait for the flows to start.
startUpComplete.acquire()
while hg.connected():
if startUpComplete.wait(1) == True:
break
startUpComplete.release()
# The node is now fully started. Start event loop.
loop.run_until_complete(appstart(loop, hg))