iotedge: How to requeue message that could not be

2019-08-27 08:48发布

There are publisher and consumer custom modules that are running on an Edge IoT device. The publisher module keeps producing messages at constant rate no matter consumer module processes it or not. The consumer module POSTs the message to external service and given that there is no Internet connection, the consumer module would like to requeue the messsage so that is not lost and tried again.

I do not prefer to write an infinite loop to keep retrying; also if the module is restarted the message would be lost. So i prefer to requeue the message to edgeHub/RocksDB.

Where do I find documentation on available responses that can be provided for IoTHubMessageDispositionResult? what is the response to be sent if message needs to be requeued?

if message.processed():
    return IoTHubMessageDispositionResult.ACCEPTED
else:
    return IoTHubMessageDispositionResult.??

1条回答
男人必须洒脱
2楼-- · 2019-08-27 09:20

You don't have to implement your own requeuing of messages. IotEdge provides offline functionality as described in this blog post and on this documentation page.

The edgeHub will locally store messages on the edgeDevice if there is no connection to the IotHub. It will automatically start sending those messages (in the correct order) once the connection is established again.

You can configure how long edgeHub will buffer messages like this:

"$edgeHub": {
    "properties.desired": {
        "schemaVersion": "1.0",
        "routes": {},
        "storeAndForwardConfiguration": {
            "timeToLiveSecs": 7200
        }
    }
}

The 7200 seconds (2 hours) is also the default if you don't configure anything.

By default, the messages will be written to a folder within the edgeHub docker container. If you want to store them somewhere else you can do so with this configuration:

"edgeHub": {
    "type": "docker",
    "settings": {
        "image": "mcr.microsoft.com/azureiotedge-hub:1.0",
        "createOptions": {
            "HostConfig": {
                "Binds": ["<HostStoragePath>:<ModuleStoragePath>"],
                "PortBindings": {
                    "8883/tcp": [{"HostPort":"8883"}],
                    "443/tcp": [{"HostPort":"443"}],
                    "5671/tcp": [{"HostPort":"5671"}]
                }
            }
        }
    },
    "env": {
        "storageFolder": {
            "value": "<ModuleStoragePath>"
        }
    },
    "status": "running",
    "restartPolicy": "always"
}

Replace HostStoragePath and ModuleStoragePath with the wanted values. Example:

"createOptions": {
                "HostConfig": {
                  "Binds": [
                    "/etc/iotedge/storage/:/iotedge/storage/"
                  ],
                  ...
                }
              }
            },
            "env": {
              "storageFolder": {
                "value": "/iotedge/storage/"
              },
              ...

Please note that you probably have to manually give the iotEdge user (or all users) access to that folder (using chmod).

Update:

If you are just looking for the available values of IoTHubMessageDispositionResult you will find the answer here:

class IoTHubMessageDispositionResult(Enum):
    ACCEPTED = 0
    REJECTED = 1
    ABANDONED = 2

Update 2:

Messages that have been ACCEPTED are removed from the message queue because they have been successfully delivered.

Messages that are ABANDONED are added to the message queue again and the module will try to send it again as defined in the retryPolicy. For more insight on the retryPolicy you can read this thread.

Messages that are REJECTED are not added to the message queue again.

查看更多
登录 后发表回答