Amazon AppFlow is a fully managed integration service that enables you to securely transfer data between Software-as-a-Service (SaaS) applications like Salesforce, SAP, Zendesk, Slack, and ServiceNow, and AWS services like Amazon S3 and Amazon Redshift, in just a few clicks. With AppFlow, you can run data flows at enterprise scale at the frequency you choose - on a schedule, in response to a business event, or on demand. You can configure data transformation capabilities like filtering and validation to generate rich, ready-to-use data as part of the flow itself, without additional steps. AppFlow automatically encrypts data in motion, and it allows users to restrict data from flowing over the public internet for SaaS applications that are integrated with AWS PrivateLink, reducing exposure to security threats. For more details about AppFlow, see https://aws.amazon.com/appflow/.
The AppFlow Custom Connector SDK enables customers and third party developers to build custom source and/or destination connectors for the AppFlow service. With the SDK, you can connect to private APIs, on-premise proprietary systems, and other cloud services by adding to AppFlow's library of connectors (https://aws.amazon.com/appflow/integrations/).
The Amazon AppFlow Custom Connector SDK simplifies connector development through the use of a normalized interface and data model. It provides authentication, pagination, throttling, error handling, deployment scripts, and a test framework. Connectors developed with the SDK can be used privately, shared within your organization or partners, or published on AWS Marketplace (https://aws.amazon.com/marketplace).
This document is a guide for developing AppFlow custom connectors. The SDK includes a complete working example of a sample connector in Python/Java.
A connector translates the normalized AppFlow requests (read and write calls) to the requests compatible for the underlying application. The translation includes protocol, API format, query format, data model etc. Connectors support queries, partial queries (paginated) and write calls (insert, update, upsert and delete operations).
AppFlow transfers data between source and destination connectors using an incremental query model. It constructs a query based on the Flow configuration and uses that to pull a configurable number of records from the source connector. For subsequent runs, AppFlow pulls only the records that have changed or were created since the last run. For structured data, the queries are based on entities (e.g. Accounts, Orders, Contact, Benefit, Report etc) as declared by a given connector.
AppFlow manages authentication information for each connector, and it makes those credentials available to the connectors with the request for data reads and writes. AppFlow depends on the connector to provide the authentication scheme(s) the connectors support and the corresponding configuration parameters. It then gives an option to the AppFlow user to pick the desired authentication scheme from the list of supported schemes for that connector. Apart from authentication configuration, connector can also declare the implementation specific runtime settings. For these settings, input from AppFlow users is required during the ConnectorProfile creation or Flow creation. AppFlow stores the user credentials in user’s security manager. The authentication credentials ARN and the runtime settings are stored along with the ConnectorProfile state, and they are passed along with every request to the connector.
AppFlow support dynamic discovery of schema and depends on the connectors to provide the required metadata. Therefore, AppFlow depends on two metadata queries from the connectors:
listEntities
. This API is recursive in nature and provides a hierarchical entity listing based on entityPath. If thelistEntities
response hashasChildren=true
, that that indicates that there are more entities in the next level.describeEntity
. This API provides field-level details of a given entity. After retrieving the metadata from the connector, AppFlow caches the metadata as per the ttl value returned by the connector along with the metadata response in the CacheControl structure. If no ttl value is specified, AppFlow defaults to 1 hour.
Interfaces | Description |
---|---|
ConfigurationHandler | Defines the functionality to be implemented for configurations, credentials related operations |
MetadataHandler | Defines the functionality to be implemented for metadata operations |
RecordHandler | Defines the functionality to be implemented for record related CRUD operations |
In the next section, we take a closer look at these interfaces. AppFlow uses these interfaces to interact with connectors for fetching data from the source connector or pushing data into the destination connector. It also uses these interfaces for the connector registration using the configuration returned by the connector and also to retrieve metadata dynamically. Therefore, along with the data interfaces, the connector developers are required to implement the interface to declare supported authentication types, configuration, and also the metadata interface which helps the connector declare the supported entities and their shape.
The connector developers therefore are essentially writing the wiring code to translate to/from from vendor’s API to the generic AppFlow custom connector interface. The connector code needs to handle the following:
- Necessary protocol and data model transformations.
- Normalization to standard error-codes.
- If the token was found to be expired, the connector can propagate the exception. Then AppFlow can handle the exception and refresh the token and reissue the request.
- Pagination support.
- Vendors specific versioning differences.
Allows the connector to declare connector runtime settings and authentication config etc using the describeConnectorConfiguration method.
It also includes the implementation for the following callbacks:
- validateConnectorRuntimeSettings: to validate user input for connector runtime settings.
- validateCredentials: to validate user credentials entered by the AppFlow user.
class ConfigurationHandler(metaclass=abc.ABCMeta):
"""This abstract base class defines the functionality to be implemented by custom connectors for configurations,
credentials related operations.
"""
@abc.abstractmethod
def describe_connector_configuration(self, request: requests.DescribeConnectorConfigurationRequest) -> \
responses.DescribeConnectorConfigurationResponse:
"""Describes the Connector Configuration supported by the connector.
Parameters:
request (DescribeConnectorConfigurationRequest)
Return:
DescribeConnectorConfigurationResponse
"""
pass
@abc.abstractmethod
def validate_connector_runtime_settings(self, request: requests.ValidateConnectorRuntimeSettingsRequest) -> \
responses.ValidateConnectorRuntimeSettingsResponse:
"""Validates the user inputs corresponding to the connector settings for a given ConnectorRuntimeSettingScope
Parameters:
request (ValidateConnectorRuntimeSettingsRequest)
Return:
ValidateConnectorRuntimeSettingsResponse
"""
pass
@abc.abstractmethod
def validate_credentials(self, request: requests.ValidateCredentialsRequest) -> \
responses.ValidateCredentialsResponse:
"""Validates the user provided credentials.
Parameters:
request (ValidateCredentialsRequest)
Return:
ValidateCredentialsResponse
"""
pass
Allows the connector to declare the entity metadata for the underlying application
- listEntities
- describeEntity
class MetadataHandler(metaclass=abc.ABCMeta):
"""This abstract base class defines the functionality to be implemented by custom connectors for metadata
operations.
"""
@abc.abstractmethod
def list_entities(self, request: requests.ListEntitiesRequest) -> responses.ListEntitiesResponse:
"""Lists all the entities available in a paginated fashion. This API is recursive in nature
and provides a heretical entity listing based on entityPath. If the ListEntitiesResponse
returns has_children=true, that indicates that there are more entities in the next level.
Parameters:
request (ListEntitiesRequest)
Return:
ListEntitiesResponse
"""
pass
@abc.abstractmethod
def describe_entity(self, request: requests.DescribeEntityRequest) -> responses.DescribeEntityResponse:
"""Describes the entity definition with its field level metadata.
Parameters:
request (DescribeEntityRequest)
Return:
DescribeEntityResponse
"""
pass
This is where the main functionality of data exchange is implemented:
- retrieveData: lookup records against a batch of ids
- queryData: queries the underlying application by translating the incoming filterExpress that follows a specific AppFlow DSL (SDK contains a parser to help parse the filterExpression)
- writeData: writes a batch of records to the underlying application
class RecordHandler(metaclass=abc.ABCMeta):
"""This abstract base class defines the functionality to be implemented by custom connectors for record related
operations.
"""
@abc.abstractmethod
def retrieve_data(self, request: requests.RetrieveDataRequest) -> responses.RetrieveDataResponse:
"""Retrieves the batch of records against a set of identifiers from the source application.
Parameters:
request (RetrieveDataRequest)
Return:
RetrieveDataResponse
"""
pass
@abc.abstractmethod
def query_data(self, request: requests.QueryDataRequest) -> responses.QueryDataResponse:
"""Writes batch of records to the destination application.
Parameters:
request (QueryDataResponse)
Return:
WriteDataResponse
"""
pass
@abc.abstractmethod
def write_data(self, request: requests.WriteDataRequest) -> responses.WriteDataResponse:
"""Writes batch of records to the destination application.
Parameters:
request (WriteDataRequest)
Return:
WriteDataResponse
"""
pass
AppFlow uses an incremental query model to fetch data from the source. The initial pull queries records modified/created in
the past 30 days (configurable). In the next flow execution onwards, it runs incremental queries to pull data modified/created since the
last execution. The AppFlow query engine takes the user input for filtering from the flow definition and translates it to
a normalized DSL and passes it on to connectors. Connectors then take that normalized expression and translate it to
application-specific format. The domain specific language (DSL) that defines the filter expressions grammar is AppFlow
specific. For example: “price > 100 and accountName contains 'Alexa'". The query expression DSL is a normalized DSL
and not specific to DSLs used by specific underlying applications. The connector's job is to provide the necessary
translation from AppFlow DSL to application-specific DSL or other syntax the underlying application requires. In order
to simplify the work needed to parse the filter expression the parser has been included. Please see the README.md under
the custom_connector_queryfilter
module for the details.
The following is a sample end-to-end processing flow to help understand how AppFlow interacts with the custom connectors using the above interfaces:
-
Step 1: Connector implementation declares the supported authentication types using the ConfigurationHandler class. AppFlow uses
describeConnectorConfiguration
function to retrieve the connector configuration. TheDescribeConnectorConfigurationResponse
includesAuthenticationConfig
andConnectorRuntimeSettings
data structures. AuthenticationConfig defines the auth scheme supported by the connector (OAuth2/BASIC etc) and ConnectorRuntimeSettings represents the setting that the connector needs at runtime, input for which are provided by the AppFlow user.(e.g. timeout values or any other implementation dependent inputs) -
Step 2: AppFlow customer deploys this connector Lambda stack into their account and registers a
Connector
against the Lambda using theregister_connector
AppFlow public API. -
Step 3: AppFlow customer creates a new
ConnectorProfile
against this newly registered connector and picks an authentication type (for this example, let’s assume they picked OAuth2) -
Step 4: AppFlow Console presents them with the corresponding authentication flow and AppFlow takes user inputs and invokes
validateCredentials
function on the connector. If that succeeds, the console presents the screen to take user input for the "connector runtime settings" declared by the connector. When user inputs the values corresponding to the settings (example: var1=10, var2=“Foo”). AppFlow invokesvalidateConnectorRuntimeSettings
function on the connector so that it can validate the user input. Upon successful validation the credentials and connector runtime settings are stored along with the newly createdConnectorProfile
. -
Step 5: AppFlow user then uses that
ConnectorProfile
to createFlow
definitions. During the flow creation AppFlow invokeslistEntity
anddescribeEntity
functions on the connector to retrieve the list of supported entities and the entity detail of the specific entity AppFlow user selected on the console / API. -
Step 6: When the flow is started AppFlow invokes the connector to execute the query constructed using the user input during the AppFlow definition using the
queryData
function. -
Step 7: For incremental flows AppFlow adds the timestamp based filter on the query and passes it on the connector to invoke it again for incremental data pulls using the query_data function.
-
Step 8: If the destination connector is also a custom connector, AppFlow processes the incoming records as per the task definitions and prepares the payload as per the entity shape of the destination connector and hands it over to the destination connector by invoking the
writeData
function.
- Clone the
custom-connector-example
module and modify the implementation to fit the underlying application you are writing the connector for. - Edit the implementations for the three lambda handlers:
ConfigurationHandler
,MetadataHandler
andRecordHandler
. - Add unit tests.
- This SDK productivity scripts under the
custom-connector-tools
module, to help build and export the package into S3 using AWS CloudFormation template and then deploy the Lambda stack into your account. - Login into the AppFlow console and register a
Connector
against the new Lambda. - Create a
ConnectorProfile
against the newly registeredConnector
. - Create a flow using the new
Connector
andConnectorProfile
. - Execute the flow.
The Amazon AppFlow documentation is located here: https://docs.aws.amazon.com/appflow/index.html. The API reference guide can be found here: https://docs.aws.amazon.com/appflow/1.0/APIReference/API_Operations.html.
The Amazon AppFlow Custom Connector SDK is available for both Java and Python on github at the following link: https://github.com/orgs/awslabs/repositories?q=aws-appflow-custom-connector-
- Java SDK - You can find the Java SDK in AppFlowJavaSdk/
- Python SDK - You can find the Python SDK .whl file in AppFlowPythonSDK/
Once you have the jar, create the Amazon AppFlow client instance and invoke the AppFlow APIs.
AmazonAppflow appflowClient = AmazonAppflowClientBuilder.standard()
.withRegion(region)
.build();
Once you have the python whl file, install the whl files using pip3 install. Create the AppFlow client using the boto3 and call the AppFlow APIs.
Commands to install the Python SDK:
python3 -m pip install botocore-1.21.25-py3-none-any.whl
python3 -m pip install boto3-1.18.25-py3-none-any.whl
import boto3
appflowClient = boto3.client('appflow')
In order for AppFlow to invoke the custom connector Lambda function while executing customer flows, the following permissions need to be added manually (or by simply using the deploy.sh
script provided in the custom-connector-tools package):
{
"Version": "*2012-10-17*",
"Id": "default",
"Statement": [{
"Sid": "*appflow-example-01*",
"Effect": "Allow",
"Principal": {
"Service": "appflow.amazonaws.com"
},
"Action": "lambda:InvokeFunction",
"Resource": "arn:aws:lambda:*us-east-1*:123456789012:function:*my-function*",
"Condition": {
"StringEquals": {
"AWS:SourceAccount": "*123456789012*"
},
"ArnLike": {
"AWS:SourceArn": "arn:aws:appflow:*us-east-1*:*123456789012*:*"
}
}
}]
}
- source account: the customers account
- source arn: all of the Amazon AppFlow resources present in the customers account
The Source Account and the Source ARN condition helps to prevent the “confused deputy” security vulnerability. The allow-listed service principal allows all of the AWS accounts under the “appflow” service principal to access the customer Lambda.
i) Go to the Amazon AppFlow console and select “Connectors” on the left-side menu.
ii) Click on the “Register New Connector” button on the console.
iii) Your custom connector Lambda function will be available on this webpage. If not, ensure that the appropriate Lambda permissions are set, as described in the section “Updating Lambda Permissions and Resource Policy Required."
iv) Provide the label for your connector. The label must be unique per account per region. Click on the “Register” button.
v) Once registered, you’ll see the connector in the list of custom connectors.
Invoke the registerConnector
public API endpoint with the following request payload:
{
"connectorLabel":"TestCustomConnector", [The unique Label]
"connectorProvisioningType":"LAMBDA", [The provisioning type of the connector.
Currently only supported value is LAMBDA]
"connectorProvisioningConfig":{
"lambda":{
"lambdaArn":"arn:aws:lambda:us-west-2:364320160620:function:
banvipulCustomConnectorBugBash" [The arn of the deployed lambda]
}
}
}
i) Go to the “Connections” link from the left menu and select the registered connector from the list.
ii) Once selected click on the ‘Create Connection’ button.
iii) A pop-up appears asking for the required details to create a connection. Please fill in the details and scroll down to click on the ‘Continue’ button.
iv) Your connection now appears in the list.
For custom connectors, a new parameter has been added in the request object for this API.
{
.............................
.............................
.............................
"connectorLabel":"TestCustomConnector", [The Label provided while registering the
connector]
.............................
.............................
.............................
}
Please follow the sample API request here https://docs.aws.amazon.com/appflow/1.0/APIReference/API_CreateConnectorProfile.html
i) Click on the Flow tab in the left menu, select Create Flow, then follow the standard AppFlow flow configuration steps.
Follow the sample API request here: https://docs.aws.amazon.com/appflow/1.0/APIReference/API_CreateFlow.html
In addition to this, for the custom connectors we have added the following parameters in the request object for this API:
Go the Amazon AppFlow console and click on “Connectors” on the left menu. Select “Custom connectors” in the drop down list.
Call the listConnectors API with the following request format, this will list all the custom connectors registered in your account:
{
"maxResults" : 100 // default is 20 and accepted values are between 1 to 100.
}
Go the Amazon AppFlow console, click on “Connectors” on the left menu, then click “View details” on the connector of interest.
Please call describeConnector API. Please follow the sample API request here:
{
"connectorType" : `"CustomConnector",
"connectorLabel": <YourConnectorLabel>`
}
- After your connector is deployed, you can access the connector through the AWS Lambda console.
- You can go to the test tab, and define the payloads as per the request format and then invoke the Lambda to test this.
- To the request payload, you can build the request using the Request model defined as SDK and then convert it to JSON string and then use that on console.
- You can also use our Testing Tools in
custom-connector-tests
andcustom-connector-integ-test
to test your connector.
Please refer to the below READMEs to learn more about the SDK:
- You can find QueryFilter README in
custom-connector-queryfilter
- You can find Example README in
custom-connector-example
- You can find Deployment Tools README in
custom-connector-tools
- You can find Testing Tools README in
custom-connector-tests
andcustom-connector-integ-test