diff --git a/examples/quickstart_examples/oop_example/Readme.md b/examples/quickstart_examples/oop_example/Readme.md new file mode 100644 index 0000000..bd15322 --- /dev/null +++ b/examples/quickstart_examples/oop_example/Readme.md @@ -0,0 +1,106 @@ +# National Parks Example + +This example implements a **connector** to fetch, process, and store data from the [National Parks Service API (NPS API)](https://www.nps.gov/subjects/developer/index.htm). The process is built with an **Object-Oriented Programming (OOP)** approach, ensuring modular, maintainable, and reusable code. + +--- + +## Overview + +This example retrieves data from the **NPS API**, processes it, and stores it in a structured format across four tables: +**Parks**, **Alerts**, **Articles**, and **People**. Each table is encapsulated within its own Python file, ensuring single responsibility and clean organization. + +The project follows a modular architecture: +- Each table has its own dedicated class for handling schema definitions and data transformations. +- A centralized **NPS Client** handles API interactions, abstracting away the complexities of HTTP requests. + +--- + +## Features + +- **Modular Design:** Each table and the API client are encapsulated in separate files for clarity and reusability. +- **Scalable:** Easily extend it to accommodate additional tables or API endpoints. +- **Customizable:** Modify transformations or table structures without affecting unrelated components. +- **Reliable:** Includes error handling for API interactions and data processing. + +--- + +## Project Structure + +```plaintext +├── parks.py # Handles the Parks table +├── alerts.py # Handles the Alerts table +├── articles.py # Handles the Articles table +├── people.py # Handles the People table +├── nps_client.py # Handles API initialization and data fetching +├── requirements.txt # Lists dependencies +├── README.md # Project documentation +└── connector.py # Main file to run the connector +``` + + +## Run the Connector + +```bash +Run fivetran debug +``` + +## Output + +### Parks + +Contains detailed information about national parks + +![PARKS](images/Parks.png "Parks Table in DB") + +#### Interact with Parks table + +```sql + +select * from parks + +``` + +### Articles + +Stores educational and informational articles about national parks + +![Articles](images/Articles.png "Articles Table in DB") + +#### Interact with Articles table + +```sql + +select * from articles + +``` + +### Alerts + +Captures active alerts for parks + +![Alerts](images/Alerts.png "Alerts Table in DB") + + + +#### Interact with Alerts table + +```sql + +select * from alerts + +``` +### People + +Lists key figures associated with the parks or their history + +![PEOPLE](images/People.png "People Table in DB") + +#### Interact with People table + +```sql + +select * from people + +``` + + diff --git a/examples/quickstart_examples/oop_example/alerts_table.py b/examples/quickstart_examples/oop_example/alerts_table.py new file mode 100644 index 0000000..9d0b6df --- /dev/null +++ b/examples/quickstart_examples/oop_example/alerts_table.py @@ -0,0 +1,95 @@ +from typing import List, Optional, Union +from xml.sax.handler import property_encoding + +from nps_client import NPS + + +class ALERTS(NPS): + """ + A subclass of NPS to handle data related to alerts. + Fetches and processes alert data from the NPS API, maps it to a schema, + and returns the processed data. + """ + + # For more info, see https://www.nps.gov/subjects/developer/index.htm + + @staticmethod + def path(): + """ + Specifies the API endpoint for alerts data. + + Returns: + str: The API path for fetching alerts data. + """ + return "alerts" + + @staticmethod + def primary_key(): + """ + Defines the primary key(s) for the alerts data. + + Returns: + list: A list containing the primary key(s) for the alerts table. + """ + return [ + "alert_id" + ] + + @staticmethod + def assign_schema(): + """ + Assigns the schema for the alerts table, including the table name, + primary key, and column definitions with data types. + + Returns: + dict: A dictionary representing the schema for the alerts table. + """ + return { + "table": ALERTS.path(), + "primary_key": ALERTS.primary_key(), + "columns": { + "alert_id": "STRING", # Unique identifier for the alert + "park_code": "STRING", # Park code related to the alert + "title": "STRING", # Title of the alert + "description": "STRING", # Description of the alert + "category": "STRING", # Category of the alert + "url": "STRING", # URL with more information about the alert + }, + } + + def process_data(self): + """ + Fetches and processes alerts data from the NPS API. Maps raw API data + to the defined schema and returns a processed list of alert information. + + Returns: + list: A list of dictionaries where each dictionary represents an alert + and its associated details mapped to the schema. + """ + # Fetch raw data for alerts using the NPS parent class method + alerts_response = NPS.fetch_data(self, ALERTS.path()) + processed_alerts = [] # List to store processed alerts data + + # Process each alert's data retrieved from the API + for alert in alerts_response[ALERTS.path()]: + # Extract and map fields from the API response + alert_id = alert.get("id", "Unknown ID") # Get alert ID or default to "Unknown ID" + park_code = alert.get("parkCode", "") # Get park ID or default to an empty string + title = alert.get("title", "No Title") # Get alert title or default to "No Title" + description = alert.get("description", "No Description") # Get description or default + category = alert.get("category", "No Category") # Get category or default to "No Category" + url = alert.get("url", "") # Get URL or default to an empty string + + # Map fields to schema-defined column names + col_mapping = { + "alert_id": alert_id, + "park_code": park_code, + "title": title, + "description": description, + "category": category, + "url": url, + } + processed_alerts.append(col_mapping) # Add the processed alert data to the list + + # Return the final processed list of alerts + return processed_alerts diff --git a/examples/quickstart_examples/oop_example/articles_table.py b/examples/quickstart_examples/oop_example/articles_table.py new file mode 100644 index 0000000..f5038d8 --- /dev/null +++ b/examples/quickstart_examples/oop_example/articles_table.py @@ -0,0 +1,106 @@ +from email.policy import default +from typing import List, Optional, Union +from xml.sax.handler import property_encoding + +from nps_client import NPS + + +class ARTICLES(NPS): + """ + A subclass of NPS to handle data related to articles. + Fetches and processes articles data from the NPS API, maps it to a schema, + and returns the processed data. + """ + + # For more info, see https://www.nps.gov/subjects/developer/index.htm + + @staticmethod + def path() -> str: + """ + Specifies the API endpoint for articles data. + + Returns: + str: The API path for fetching articles data. + """ + return "articles" + + @staticmethod + def primary_key(): + """ + Defines the primary key(s) for the articles data. + + Returns: + list: A list containing the primary key(s) for the articles table. + """ + return [ + "article_id" + ] + + @staticmethod + def assign_schema(): + """ + Assigns the schema for the articles table, including the table name, + primary key, and column definitions with data types. + + Returns: + dict: A dictionary representing the schema for the articles table. + """ + return { + "table": ARTICLES.path(), + "primary_key": ARTICLES.primary_key(), + "columns": { + "article_id": "STRING", # Unique identifier for the article + "title": "STRING", # Title of the article + "url": "STRING", # URL to the article + "park_code": "STRING", # Park codes related to the article + "park_names": "STRING", # Names of parks related to the article + "states": "STRING", # States related to the parks in the article + "listing_description": "STRING", # Description of the article + "date": "STRING", # Date the article was published + }, + } + + def process_data(self): + """ + Fetches and processes articles data from the NPS API. Maps raw API data + to the defined schema and returns a processed list of article information. + + Returns: + list: A list of dictionaries where each dictionary represents an article + and its associated details mapped to the schema. + """ + # Fetch raw data for articles using the NPS parent class method + articles_response = NPS.fetch_data(self, ARTICLES.path()) + processed_articles = [] # List to store processed articles data + + # Process each article's data retrieved from the API + for article in articles_response[ARTICLES.path()]: + # Extract and map fields from the API response + article_id = article.get("id", "Unknown ID") # Get article ID or default to "Unknown ID" + title = article.get("title", "No Title") # Get article title or default to "No Title" + url = article.get("url", "") # Get article URL or default to an empty string + + # Extract related parks data as lists of codes, names, and states + park_code = [related_park.get("parkCode", "") for related_park in article.get("relatedParks", [])] + park_names = [related_park.get("fullName", "") for related_park in article.get("relatedParks", [])] + states = [related_park.get("states", "") for related_park in article.get("relatedParks", [])] + + listing_description = article.get("listingDescription", "") # Get description or default to an empty string + date = article.get("date", "") # Get publication date or default to an empty string + + # Map fields to schema-defined column names + col_mapping = { + "article_id": article_id, + "title": title, + "url": url, + "park_code": park_code, + "park_names": park_names, + "states": states, + "listing_description": listing_description, + "date": date, + } + + processed_articles.append(col_mapping) # Add the processed article data to the list + + # Return the final processed list of articles + return processed_articles diff --git a/examples/quickstart_examples/oop_example/configuration.json b/examples/quickstart_examples/oop_example/configuration.json new file mode 100644 index 0000000..392cfb1 --- /dev/null +++ b/examples/quickstart_examples/oop_example/configuration.json @@ -0,0 +1,3 @@ +{ + "api_key": "" +} \ No newline at end of file diff --git a/examples/quickstart_examples/oop_example/connector.py b/examples/quickstart_examples/oop_example/connector.py new file mode 100644 index 0000000..d91eb97 --- /dev/null +++ b/examples/quickstart_examples/oop_example/connector.py @@ -0,0 +1,63 @@ +import json +from fivetran_connector_sdk import Connector +from fivetran_connector_sdk import Operations as op +from fivetran_connector_sdk import Logging as log +from alerts_table import ALERTS +from park_table import PARKS +from articles_table import ARTICLES +from people_table import PEOPLE + +selected_table = [PARKS,PEOPLE,ALERTS,ARTICLES] + +# Define the schema function which lets you configure the schema your connector delivers. +# See the technical reference documentation for more details on the schema function: +# https://fivetran.com/docs/connectors/connector-sdk/technical-reference#schema +# The schema function takes one parameter: +# - configuration: a dictionary that holds the configuration settings for the connector. + + +def schema(configuration: dict): + output = [] + for table in selected_table: + con = table(configuration=configuration) + schema_dict = con.assign_schema() + #print(f"Schema for table {con}: {schema_dict}") + output.append(schema_dict) + return output + +# Define the update function, which is a required function, and is called by Fivetran during each sync. +# See the technical reference documentation for more details on the update function: +# https://fivetran.com/docs/connectors/connector-sdk/technical-reference#update +# The function takes two parameters: +# - configuration: dictionary containing any secrets or payloads you configure when deploying the connector. +# - state: a dictionary containing the state checkpointed during the prior sync. +# The state dictionary is empty for the first sync or for any full re-sync. + +def update(configuration: dict, state:dict): + for table in selected_table: + con = table(configuration=configuration) + data = con.process_data() + for row in data: + yield op.upsert(table.path(),row) + + + + +#Create the connector object for Fivetran. +connector = Connector(update=update, schema=schema) + +#Run the connector in debug mode +if __name__ == "__main__": + print("Running the NPS connector (Parks, Articles, People, and Alerts tables)...") + + if __name__ == "__main__": + # Open the configuration.json file and load its contents into a dictionary. + with open("configuration.json", 'r') as f: + configuration = json.load(f) + # Adding this code to your `connector.py` allows you to test your connector by running your file directly from your IDE. + connector.debug(configuration=configuration) + print("Connector run complete.") + + + + diff --git a/examples/quickstart_examples/oop_example/images/Alerts.png b/examples/quickstart_examples/oop_example/images/Alerts.png new file mode 100644 index 0000000..5c0eb95 Binary files /dev/null and b/examples/quickstart_examples/oop_example/images/Alerts.png differ diff --git a/examples/quickstart_examples/oop_example/images/Articles.png b/examples/quickstart_examples/oop_example/images/Articles.png new file mode 100644 index 0000000..999d37b Binary files /dev/null and b/examples/quickstart_examples/oop_example/images/Articles.png differ diff --git a/examples/quickstart_examples/oop_example/images/Parks.png b/examples/quickstart_examples/oop_example/images/Parks.png new file mode 100644 index 0000000..b40bf1f Binary files /dev/null and b/examples/quickstart_examples/oop_example/images/Parks.png differ diff --git a/examples/quickstart_examples/oop_example/images/People.png b/examples/quickstart_examples/oop_example/images/People.png new file mode 100644 index 0000000..32315a0 Binary files /dev/null and b/examples/quickstart_examples/oop_example/images/People.png differ diff --git a/examples/quickstart_examples/oop_example/nps_client.py b/examples/quickstart_examples/oop_example/nps_client.py new file mode 100644 index 0000000..41dc177 --- /dev/null +++ b/examples/quickstart_examples/oop_example/nps_client.py @@ -0,0 +1,106 @@ +import requests as rq +import json + + + +class NPS: + """ + A class to interact with the National Park Service (NPS) API. + Provides methods to fetch data from the API with an optional limit on the number of results. + """ + + def __init__(self, configuration) -> None: + """ + Initialize the NPS object with configuration details. + + Args: + configuration (dict): Configuration dictionary containing the API key. + + Raises: + ValueError: If the API key is not provided in the configuration. + """ + with open("configuration.json", 'r') as f: + configuration = json.load(f) + + self.api_key = configuration.get("api_key") + if not self.api_key: + raise ValueError("API key is missing from the configuration.") + + # Default limit for the number of results fetched from the API + self.limit = 3 + + # Base URL for the NPS API + self.base_url = "https://developer.nps.gov/api/v1" + + @staticmethod + def path() -> str: + """ + Placeholder method to define the API endpoint path. + + Returns: + str: The path to the desired endpoint. + + Raises: + Exception: If not overridden by a subclass or implementation. + """ + raise Exception("Change me") + + @staticmethod + def primary_key(): + """ + Placeholder method to specify the primary key for the data being fetched. + + Returns: + Any: The primary key for the fetched data. + + Raises: + Exception: If not overridden by a subclass or implementation. + """ + raise Exception("Change me") + + @staticmethod + def columns(): + """ + Placeholder method to define the columns/fields of interest for the data. + + Returns: + list: List of columns or fields to extract. + + Raises: + Exception: If not overridden by a subclass or implementation. + """ + raise Exception("Change me") + + def fetch_data(self, endpoint): + """ + Fetch data from a specific endpoint of the NPS API. + + Args: + endpoint (str): The endpoint to fetch data from (e.g., 'parks', 'events'). + + Returns: + dict: A dictionary containing the data fetched from the API, + organized by endpoint. Returns an empty dictionary in case of an error. + """ + # Parameters for the API request + params = {"api_key": self.api_key, "limit": self.limit} + + # Results will store the full response; data will store the extracted 'data' field + results = {} + data = {} + try: + # Make a GET request to the specified API endpoint + response = rq.get(f"{self.base_url}/{endpoint}", params=params) + response.raise_for_status() # Raise an exception for HTTP errors + + # Parse the JSON response and extract data + results[endpoint] = response.json() + data[endpoint] = results[endpoint].get("data", []) + print(f"Number of {endpoint} retrieved: {len(data[endpoint])}") + except rq.exceptions.RequestException as e: + # Handle exceptions and log a warning message + log.warning(f"API request to {endpoint} failed: {e}") + return [] + + # Return the extracted data + return data diff --git a/examples/quickstart_examples/oop_example/park_table.py b/examples/quickstart_examples/oop_example/park_table.py new file mode 100644 index 0000000..d98a28b --- /dev/null +++ b/examples/quickstart_examples/oop_example/park_table.py @@ -0,0 +1,96 @@ +from nps_client import NPS + + +class PARKS(NPS): + """ + A subclass of NPS to handle data related to parks. + Fetches and processes park data from the NPS API, maps it to a schema, + and returns the processed data. + """ + + # For more info, see https://www.nps.gov/subjects/developer/index.htm + + @staticmethod + def path() -> str: + """ + Specifies the API endpoint for parks data. + + Returns: + str: The API path for fetching park data. + """ + return "parks" + + @staticmethod + def primary_key(): + """ + Defines the primary key(s) for the parks data. + + Returns: + list: A list containing the primary key(s) for the parks table. + """ + return [ + "park_id" + ] + + @staticmethod + def assign_schema(): + """ + Assigns the schema for the parks table, including the table name, + primary key, and column definitions with data types. + + Returns: + dict: A dictionary representing the schema for the parks table. + """ + return { + "table": PARKS.path(), + "primary_key": PARKS.primary_key(), + "columns": { + "park_id": "STRING", # Unique identifier for the park + "name": "STRING", # Name of the park + "description": "STRING", # Description of the park + "state": "STRING", # State(s) where the park is located + "latitude": "FLOAT", # Latitude of the park's location + "longitude": "FLOAT", # Longitude of the park's location + "activities": "STRING", # Activities available in the park + }, + } + + def process_data(self): + """ + Fetches and processes park data from the NPS API. Maps raw API data + to the defined schema and returns a processed list of park information. + + Returns: + list: A list of dictionaries where each dictionary represents a park + and its associated details mapped to the schema. + """ + # Fetch raw data for parks using the NPS parent class method + parks_response = NPS.fetch_data(self, PARKS.path()) + processed_parks = [] # List to store processed park data + + # Process each park's data retrieved from the API + for park in parks_response[PARKS.path()]: + # Extract and map fields from the API response + park_id = park.get("id", "Unknown ID") # Get park ID or default to "Unknown ID" + name = park.get("fullName", "No Name") # Get park name or default to "No Name" + description = park.get("description", "No Description") # Get description or default + state = ", ".join(park.get("states", [])) # Concatenate states into a single string + latitude = park.get("latitude", None) # Get latitude or default to None + longitude = park.get("longitude", None) # Get longitude or default to None + activities = ", ".join(activity["name"] for activity in park.get("activities", [])) + # Join activity names into a single string + + # Map fields to schema-defined column names + col_mapping = { + "park_id": park_id, + "name": name, + "description": description, + "state": state, + "latitude": float(latitude) if latitude else None, + "longitude": float(longitude) if longitude else None, + "activities": activities + } + processed_parks.append(col_mapping) # Add the processed park data to the list + + # Return the final processed list of parks + return processed_parks \ No newline at end of file diff --git a/examples/quickstart_examples/oop_example/people_table.py b/examples/quickstart_examples/oop_example/people_table.py new file mode 100644 index 0000000..e055d99 --- /dev/null +++ b/examples/quickstart_examples/oop_example/people_table.py @@ -0,0 +1,101 @@ +from typing import List, Optional, Union +from xml.sax.handler import property_encoding + +from nps_client import NPS + + +class PEOPLE(NPS): + """ + A subclass of NPS to handle data related to people. + Fetches and processes people-related data from the NPS API, maps it to a schema, + and returns the processed data. + """ + + # For more info, see https://www.nps.gov/subjects/developer/index.htm + + @staticmethod + def path(): + """ + Specifies the API endpoint for people data. + + Returns: + str: The API path for fetching people data. + """ + return "people" + + @staticmethod + def primary_key(): + """ + Defines the primary key(s) for the people data. + + Returns: + list: A list containing the primary key(s) for the people table. + """ + return [ + "person_id" + ] + + @staticmethod + def assign_schema(): + """ + Assigns the schema for the people table, including the table name, + primary key, and column definitions with data types. + + Returns: + dict: A dictionary representing the schema for the people table. + """ + return { + "table": PEOPLE.path(), + "primary_key": PEOPLE.primary_key(), + "columns": { + "person_id": "STRING", # Unique identifier for the person + "name": "STRING", # Name of the person + "title": "STRING", # Title of the person + "description": "STRING", # Description of the person + "url": "STRING", # URL for more information about the person + "related_parks": "STRING", # Parks related to the person + }, + } + + def process_data(self): + """ + Fetches and processes people data from the NPS API. Maps raw API data + to the defined schema and returns a processed list of people information. + + Returns: + list: A list of dictionaries where each dictionary represents a person + and their associated details mapped to the schema. + """ + # Fetch raw data for people using the NPS parent class method + people_response = NPS.fetch_data(self, PEOPLE.path()) + processed_people = [] # List to store processed people data + + # Process each person's data retrieved from the API + for p in people_response[PEOPLE.path()]: + # Extract and map fields from the API response + person_id = p.get("id", "Unknown ID") # Get person ID or default to "Unknown ID" + name = p.get("title", "No Name") # Get person's name or default to "No Name" + title = p.get("listingDescription", "No Title") # Get title or default to "No Title" + description = p.get("listingDescription", "No Description") # Get description or default + url = p.get("url", "") # Get URL or default to an empty string + related_parks = ", ".join(p["parkCode"] for p in p.get("relatedParks", [])) + # Concatenate related park codes into a single string + + # Map fields to schema-defined column names + col_mapping = { + "person_id": person_id, + "name": name, + "title": title, + "description": description, + "url": url, + "related_parks": related_parks, + } + processed_people.append(col_mapping) # Add the processed person data to the list + + # Return the final processed list of people + return processed_people + + + + +