Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding an oop example #71

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions examples/quickstart_examples/oop_example/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# National Parks Example

This example implements a **connector** to fetch, process, and store data from the [National Parks Service API (NPS API)](https://www.nps.gov/subjects/developer/index.htm). The process is built with an **Object-Oriented Programming (OOP)** approach, ensuring modular, maintainable, and reusable code.

---

## Overview

This example retrieves data from the **NPS API**, processes it, and stores it in a structured format across four tables:
**Parks**, **Alerts**, **Articles**, and **People**. Each table is encapsulated within its own Python file, ensuring single responsibility and clean organization.

The project follows a modular architecture:
- Each table has its own dedicated class for handling schema definitions and data transformations.
- A centralized **NPS Client** handles API interactions, abstracting away the complexities of HTTP requests.

---

## Features

- **Modular Design:** Each table and the API client are encapsulated in separate files for clarity and reusability.
- **Scalable:** Easily extend it to accommodate additional tables or API endpoints.
- **Customizable:** Modify transformations or table structures without affecting unrelated components.
- **Reliable:** Includes error handling for API interactions and data processing.

---

## Project Structure

```plaintext
├── parks.py # Handles the Parks table
├── alerts.py # Handles the Alerts table
├── articles.py # Handles the Articles table
├── people.py # Handles the People table
├── nps_client.py # Handles API initialization and data fetching
├── requirements.txt # Lists dependencies
├── README.md # Project documentation
└── connector.py # Main file to run the connector
```


## Run the Connector

```bash
Run fivetran debug
```

## Output

### Parks

Contains detailed information about national parks

![PARKS](images/Parks.png "Parks Table in DB")

#### Interact with Parks table

```sql

select * from parks

```

### Articles

Stores educational and informational articles about national parks

![Articles](images/Articles.png "Articles Table in DB")

#### Interact with Articles table

```sql

select * from articles

```

### Alerts

Captures active alerts for parks

![Alerts](images/Alerts.png "Alerts Table in DB")



#### Interact with Alerts table

```sql

select * from alerts

```
### People

Lists key figures associated with the parks or their history

![PEOPLE](images/People.png "People Table in DB")

#### Interact with People table

```sql

select * from people

```


95 changes: 95 additions & 0 deletions examples/quickstart_examples/oop_example/alerts_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from typing import List, Optional, Union
from xml.sax.handler import property_encoding

from nps_client import NPS


class ALERTS(NPS):
"""
A subclass of NPS to handle data related to alerts.
Fetches and processes alert data from the NPS API, maps it to a schema,
and returns the processed data.
"""

# For more info, see https://www.nps.gov/subjects/developer/index.htm

@staticmethod
def path():
"""
Specifies the API endpoint for alerts data.

Returns:
str: The API path for fetching alerts data.
"""
return "alerts"

@staticmethod
def primary_key():
"""
Defines the primary key(s) for the alerts data.

Returns:
list: A list containing the primary key(s) for the alerts table.
"""
return [
"alert_id"
]

@staticmethod
def assign_schema():
"""
Assigns the schema for the alerts table, including the table name,
primary key, and column definitions with data types.

Returns:
dict: A dictionary representing the schema for the alerts table.
"""
return {
"table": ALERTS.path(),
"primary_key": ALERTS.primary_key(),
"columns": {
"alert_id": "STRING", # Unique identifier for the alert
"park_code": "STRING", # Park code related to the alert
"title": "STRING", # Title of the alert
"description": "STRING", # Description of the alert
"category": "STRING", # Category of the alert
"url": "STRING", # URL with more information about the alert
},
}

def process_data(self):
"""
Fetches and processes alerts data from the NPS API. Maps raw API data
to the defined schema and returns a processed list of alert information.

Returns:
list: A list of dictionaries where each dictionary represents an alert
and its associated details mapped to the schema.
"""
# Fetch raw data for alerts using the NPS parent class method
alerts_response = NPS.fetch_data(self, ALERTS.path())
processed_alerts = [] # List to store processed alerts data

# Process each alert's data retrieved from the API
for alert in alerts_response[ALERTS.path()]:
# Extract and map fields from the API response
alert_id = alert.get("id", "Unknown ID") # Get alert ID or default to "Unknown ID"
park_code = alert.get("parkCode", "") # Get park ID or default to an empty string
title = alert.get("title", "No Title") # Get alert title or default to "No Title"
description = alert.get("description", "No Description") # Get description or default
category = alert.get("category", "No Category") # Get category or default to "No Category"
url = alert.get("url", "") # Get URL or default to an empty string

# Map fields to schema-defined column names
col_mapping = {
"alert_id": alert_id,
"park_code": park_code,
"title": title,
"description": description,
"category": category,
"url": url,
}
processed_alerts.append(col_mapping) # Add the processed alert data to the list

# Return the final processed list of alerts
return processed_alerts
106 changes: 106 additions & 0 deletions examples/quickstart_examples/oop_example/articles_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from email.policy import default
from typing import List, Optional, Union
from xml.sax.handler import property_encoding

from nps_client import NPS


class ARTICLES(NPS):
"""
A subclass of NPS to handle data related to articles.
Fetches and processes articles data from the NPS API, maps it to a schema,
and returns the processed data.
"""

# For more info, see https://www.nps.gov/subjects/developer/index.htm

@staticmethod
def path() -> str:
"""
Specifies the API endpoint for articles data.

Returns:
str: The API path for fetching articles data.
"""
return "articles"

@staticmethod
def primary_key():
"""
Defines the primary key(s) for the articles data.

Returns:
list: A list containing the primary key(s) for the articles table.
"""
return [
"article_id"
]

@staticmethod
def assign_schema():
"""
Assigns the schema for the articles table, including the table name,
primary key, and column definitions with data types.

Returns:
dict: A dictionary representing the schema for the articles table.
"""
return {
"table": ARTICLES.path(),
"primary_key": ARTICLES.primary_key(),
"columns": {
"article_id": "STRING", # Unique identifier for the article
"title": "STRING", # Title of the article
"url": "STRING", # URL to the article
"park_code": "STRING", # Park codes related to the article
"park_names": "STRING", # Names of parks related to the article
"states": "STRING", # States related to the parks in the article
"listing_description": "STRING", # Description of the article
"date": "STRING", # Date the article was published
},
}

def process_data(self):
"""
Fetches and processes articles data from the NPS API. Maps raw API data
to the defined schema and returns a processed list of article information.

Returns:
list: A list of dictionaries where each dictionary represents an article
and its associated details mapped to the schema.
"""
# Fetch raw data for articles using the NPS parent class method
articles_response = NPS.fetch_data(self, ARTICLES.path())
processed_articles = [] # List to store processed articles data

# Process each article's data retrieved from the API
for article in articles_response[ARTICLES.path()]:
# Extract and map fields from the API response
article_id = article.get("id", "Unknown ID") # Get article ID or default to "Unknown ID"
title = article.get("title", "No Title") # Get article title or default to "No Title"
url = article.get("url", "") # Get article URL or default to an empty string

# Extract related parks data as lists of codes, names, and states
park_code = [related_park.get("parkCode", "") for related_park in article.get("relatedParks", [])]
park_names = [related_park.get("fullName", "") for related_park in article.get("relatedParks", [])]
states = [related_park.get("states", "") for related_park in article.get("relatedParks", [])]

listing_description = article.get("listingDescription", "") # Get description or default to an empty string
date = article.get("date", "") # Get publication date or default to an empty string

# Map fields to schema-defined column names
col_mapping = {
"article_id": article_id,
"title": title,
"url": url,
"park_code": park_code,
"park_names": park_names,
"states": states,
"listing_description": listing_description,
"date": date,
}

processed_articles.append(col_mapping) # Add the processed article data to the list

# Return the final processed list of articles
return processed_articles
3 changes: 3 additions & 0 deletions examples/quickstart_examples/oop_example/configuration.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"api_key": ""
}
63 changes: 63 additions & 0 deletions examples/quickstart_examples/oop_example/connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import json
from fivetran_connector_sdk import Connector
from fivetran_connector_sdk import Operations as op
from fivetran_connector_sdk import Logging as log
from alerts_table import ALERTS
from park_table import PARKS
from articles_table import ARTICLES
from people_table import PEOPLE

selected_table = [PARKS,PEOPLE,ALERTS,ARTICLES]

# Define the schema function which lets you configure the schema your connector delivers.
# See the technical reference documentation for more details on the schema function:
# https://fivetran.com/docs/connectors/connector-sdk/technical-reference#schema
# The schema function takes one parameter:
# - configuration: a dictionary that holds the configuration settings for the connector.


def schema(configuration: dict):
output = []
for table in selected_table:
con = table(configuration=configuration)
schema_dict = con.assign_schema()
#print(f"Schema for table {con}: {schema_dict}")
output.append(schema_dict)
return output

# Define the update function, which is a required function, and is called by Fivetran during each sync.
# See the technical reference documentation for more details on the update function:
# https://fivetran.com/docs/connectors/connector-sdk/technical-reference#update
# The function takes two parameters:
# - configuration: dictionary containing any secrets or payloads you configure when deploying the connector.
# - state: a dictionary containing the state checkpointed during the prior sync.
# The state dictionary is empty for the first sync or for any full re-sync.

def update(configuration: dict, state:dict):
for table in selected_table:
con = table(configuration=configuration)
data = con.process_data()
for row in data:
yield op.upsert(table.path(),row)




#Create the connector object for Fivetran.
connector = Connector(update=update, schema=schema)

#Run the connector in debug mode
if __name__ == "__main__":
print("Running the NPS connector (Parks, Articles, People, and Alerts tables)...")

if __name__ == "__main__":
# Open the configuration.json file and load its contents into a dictionary.
with open("configuration.json", 'r') as f:
configuration = json.load(f)
# Adding this code to your `connector.py` allows you to test your connector by running your file directly from your IDE.
connector.debug(configuration=configuration)
print("Connector run complete.")




Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading