create_pipeline

create_pipeline(**kwargs)

Creates a pipeline. A pipeline consumes messages from a channel and allows you to process the messages before storing them in a data store. You must specify both a channel and a datastore activity and, optionally, as many as 23 additional activities in the pipelineActivities array.

See also: AWS API Documentation

Request Syntax

response = client.create_pipeline(
    pipelineName='string',
    pipelineActivities=[
        {
            'channel': {
                'name': 'string',
                'channelName': 'string',
                'next': 'string'
            },
            'lambda': {
                'name': 'string',
                'lambdaName': 'string',
                'batchSize': 123,
                'next': 'string'
            },
            'datastore': {
                'name': 'string',
                'datastoreName': 'string'
            },
            'addAttributes': {
                'name': 'string',
                'attributes': {
                    'string': 'string'
                },
                'next': 'string'
            },
            'removeAttributes': {
                'name': 'string',
                'attributes': [
                    'string',
                ],
                'next': 'string'
            },
            'selectAttributes': {
                'name': 'string',
                'attributes': [
                    'string',
                ],
                'next': 'string'
            },
            'filter': {
                'name': 'string',
                'filter': 'string',
                'next': 'string'
            },
            'math': {
                'name': 'string',
                'attribute': 'string',
                'math': 'string',
                'next': 'string'
            },
            'deviceRegistryEnrich': {
                'name': 'string',
                'attribute': 'string',
                'thingName': 'string',
                'roleArn': 'string',
                'next': 'string'
            },
            'deviceShadowEnrich': {
                'name': 'string',
                'attribute': 'string',
                'thingName': 'string',
                'roleArn': 'string',
                'next': 'string'
            }
        },
    ],
    tags=[
        {
            'key': 'string',
            'value': 'string'
        },
    ]
)
Parameters
  • pipelineName (string) --

    [REQUIRED]

    The name of the pipeline.

  • pipelineActivities (list) --

    [REQUIRED]

    A list of PipelineActivity objects. Activities perform transformations on your messages, such as removing, renaming or adding message attributes; filtering messages based on attribute values; invoking your Lambda unctions on messages for advanced processing; or performing mathematical transformations to normalize device data.

    The list can be 2-25 PipelineActivity objects and must contain both a channel and a datastore activity. Each entry in the list must contain only one activity. For example:

    pipelineActivities = [ { "channel": { ... } }, { "lambda": { ... } }, ... ]
    • (dict) --

      An activity that performs a transformation on a message.

      • channel (dict) --

        Determines the source of the messages to be processed.

        • name (string) -- [REQUIRED]

          The name of the channel activity.

        • channelName (string) -- [REQUIRED]

          The name of the channel from which the messages are processed.

        • next (string) --

          The next activity in the pipeline.

      • lambda (dict) --

        Runs a Lambda function to modify the message.

        • name (string) -- [REQUIRED]

          The name of the lambda activity.

        • lambdaName (string) -- [REQUIRED]

          The name of the Lambda function that is run on the message.

        • batchSize (integer) -- [REQUIRED]

          The number of messages passed to the Lambda function for processing.

          The Lambda function must be able to process all of these messages within five minutes, which is the maximum timeout duration for Lambda functions.

        • next (string) --

          The next activity in the pipeline.

      • datastore (dict) --

        Specifies where to store the processed message data.

        • name (string) -- [REQUIRED]

          The name of the datastore activity.

        • datastoreName (string) -- [REQUIRED]

          The name of the data store where processed messages are stored.

      • addAttributes (dict) --

        Adds other attributes based on existing attributes in the message.

        • name (string) -- [REQUIRED]

          The name of the addAttributes activity.

        • attributes (dict) -- [REQUIRED]

          A list of 1-50 AttributeNameMapping objects that map an existing attribute to a new attribute.

          Note

          The existing attributes remain in the message, so if you want to remove the originals, use RemoveAttributeActivity .

          • (string) --
            • (string) --
        • next (string) --

          The next activity in the pipeline.

      • removeAttributes (dict) --

        Removes attributes from a message.

        • name (string) -- [REQUIRED]

          The name of the removeAttributes activity.

        • attributes (list) -- [REQUIRED]

          A list of 1-50 attributes to remove from the message.

          • (string) --
        • next (string) --

          The next activity in the pipeline.

      • selectAttributes (dict) --

        Used to create a new message using only the specified attributes from the original message.

        • name (string) -- [REQUIRED]

          The name of the selectAttributes activity.

        • attributes (list) -- [REQUIRED]

          A list of the attributes to select from the message.

          • (string) --
        • next (string) --

          The next activity in the pipeline.

      • filter (dict) --

        Filters a message based on its attributes.

        • name (string) -- [REQUIRED]

          The name of the filter activity.

        • filter (string) -- [REQUIRED]

          An expression that looks like a SQL WHERE clause that must return a Boolean value. Messages that satisfy the condition are passed to the next activity.

        • next (string) --

          The next activity in the pipeline.

      • math (dict) --

        Computes an arithmetic expression using the message's attributes and adds it to the message.

        • name (string) -- [REQUIRED]

          The name of the math activity.

        • attribute (string) -- [REQUIRED]

          The name of the attribute that contains the result of the math operation.

        • math (string) -- [REQUIRED]

          An expression that uses one or more existing attributes and must return an integer value.

        • next (string) --

          The next activity in the pipeline.

      • deviceRegistryEnrich (dict) --

        Adds data from the IoT device registry to your message.

        • name (string) -- [REQUIRED]

          The name of the deviceRegistryEnrich activity.

        • attribute (string) -- [REQUIRED]

          The name of the attribute that is added to the message.

        • thingName (string) -- [REQUIRED]

          The name of the IoT device whose registry information is added to the message.

        • roleArn (string) -- [REQUIRED]

          The ARN of the role that allows access to the device's registry information.

        • next (string) --

          The next activity in the pipeline.

      • deviceShadowEnrich (dict) --

        Adds information from the IoT Device Shadow service to a message.

        • name (string) -- [REQUIRED]

          The name of the deviceShadowEnrich activity.

        • attribute (string) -- [REQUIRED]

          The name of the attribute that is added to the message.

        • thingName (string) -- [REQUIRED]

          The name of the IoT device whose shadow information is added to the message.

        • roleArn (string) -- [REQUIRED]

          The ARN of the role that allows access to the device's shadow.

        • next (string) --

          The next activity in the pipeline.

  • tags (list) --

    Metadata which can be used to manage the pipeline.

    • (dict) --

      A set of key-value pairs that are used to manage the resource.

      • key (string) -- [REQUIRED]

        The tag's key.

      • value (string) -- [REQUIRED]

        The tag's value.

Return type

dict

Returns

Response Syntax

{
    'pipelineName': 'string',
    'pipelineArn': 'string'
}

Response Structure

  • (dict) --

    • pipelineName (string) --

      The name of the pipeline.

    • pipelineArn (string) --

      The ARN of the pipeline.

Exceptions

  • IoTAnalytics.Client.exceptions.InvalidRequestException
  • IoTAnalytics.Client.exceptions.ResourceAlreadyExistsException
  • IoTAnalytics.Client.exceptions.InternalFailureException
  • IoTAnalytics.Client.exceptions.ServiceUnavailableException
  • IoTAnalytics.Client.exceptions.ThrottlingException
  • IoTAnalytics.Client.exceptions.LimitExceededException