Connecting Batch to Snowflake
Modern data architectures demand seamless integration between storage, processing, and actionable platforms. Connecting Batch to Snowflake enables a unified, efficient, and scalable approach to leveraging customer data for real-time engagement. Here’s why this integration is essential:
Centralized Data as the Single Source of Truth
Snowflake consolidates customer data across platforms, ensuring Batch can access a single, accurate source of truth. This reduces discrepancies and latency in activating customer engagement strategies.Event-Driven Architecture
The integration leverages materialized views and streams to cascade data changes efficiently. Simultaneously, it emphasizes minimal data movement, directly querying Snowflake’s computational power to prepare data for Batch without unnecessary replication or intermediaries, ensuring a secure and streamlined process.
Scalability and Future-Readiness
Snowflake’s elastic design easily handles growing datasets, ensuring Batch can engage with millions of profiles in real time. This setup also supports future innovations, such as AI-driven personalization or advanced segmentation workflows.
By connecting Batch to Snowflake, you unlock a data architecture that prioritizes responsiveness, accuracy, and scalability. It’s the foundation for real-time, personalized customer engagement, built on modern principles of data-driven efficiency.
Step-by-step guide
1. Define data flows
Define what customer data you want to send to Batch, with the guidance of your dedicated Solutions Engineer and Onboarding Manager:
Identify your existing and future use cases (Marketing Automation, Newsletters, Transactional), across all channels (Email, SMS, Push notifications, In-app messaging, Inbox).
Map the data flows needed to create, update and delete user profiles. The Batch team usually creates for you a “tagging plan”, defining which attributes, arrays and events you should send to Batch.
2. Create a dedicated Snowflake User
To ensure security, access control, and follow Snowflake's best practices for resource management, we recommend creating a dedicated Snowflake user for this Batch connection.
If you choose to create this user, make sure to use BATCH_USER throughout all steps in this guide.
This user must have access to your source database, where your customer data is stored.
CREATE USER BATCH_USER PASSWORD = 'your_secure_password';
CREATE ROLE IF NOT EXISTS BATCH_ROLE;
GRANT ROLE BATCH_ROLE TO USER BATCH_USER;
GRANT USAGE ON DATABASE <YOUR DATABASE> TO BATCH_ROLE;
GRANT USAGE ON WAREHOUSE <YOUR WAREHOUSE> TO BATCH_ROLE;
3. Create a Materialized View
A materialized view is a pre-computed data set derived from a query specification and stored for later use.
Write an SQL query to join the relevant tables and compute the desired metrics, e.g. the last order date per customer (we will keep this example through all the steps).
Create a materialized view to store the precomputed results of this query:
CREATE OR REPLACE MATERIALIZED VIEW customer_last_order_view AS SELECT custom_id, last_order_date FROM your_source_table
Use this view to ensure changes in the underlying tables automatically propagate.
4. Set up a Stream on the Materialized View
Streams are like tables, except they only contain data that's new from their source. You can use them to capture changes (inserts, updates, or deletes) in your customer data table.
Create a stream to track changes to the materialized view:
CREATE OR REPLACE STREAM customer_last_order_stream ON VIEW customer_last_order_view;
The stream will detect changes (e.g., when a new order updates the last order date) and propagate these updates downstream.
5. Write a Python script to query the Stream and call the Batch API
The Batch Profile API is the reference for all your server-side customer data ingestion operations. It enables you, via a single endpoint, to :
Create new user profiles, and associate them with standard fields such as e-mail address, phone number, language/region and user consent levels
Set user properties with different structures (key/value attributes, arrays, objects) and types (string, int, float, bool, URL, date, etc.)
Track events in real time to trigger messages on any channel, and customise them using event properties in the above format.
Snowflake’s Python Connector enables Python applications to connect to Snowflake, execute SQL queries, and manage data.
Create a Python script that queries data from the stream, formats it to match the expected payload and sends it to the Batch Profile API.
Here is a script example:
Here is a script example:
import snowflake.connector
import requests
import json
# Configuration variables
SNOWFLAKE_ACCOUNT = "your_account"
SNOWFLAKE_USER = "your_username"
SNOWFLAKE_PASSWORD = "your_password"
SNOWFLAKE_DATABASE = "your_database"
SNOWFLAKE_SCHEMA = "your_schema"
SNOWFLAKE_STREAM = "customer_last_order_stream"
BATCH_API_URL = "https://api.batch.com/2.3/profile/update"
BATCH_REST_KEY = "your_batch_rest_key"
BATCH_PROJECT_KEY = "your_batch_project_key"
def query_stream():
# Connects to Snowflake and queries the stream to retrieve new data
connection = snowflake.connector.connect(
account=SNOWFLAKE_ACCOUNT,
user=SNOWFLAKE_USER,
password=SNOWFLAKE_PASSWORD,
database=SNOWFLAKE_DATABASE,
schema=SNOWFLAKE_SCHEMA,
)
try:
cursor = connection.cursor()
# Query the stream for new rows
query = f"SELECT custom_id, last_order_date FROM {SNOWFLAKE_STREAM}" cursor.execute(query)
results = cursor.fetchall()
# Format the results into a list of dictionaries
data = [{"custom_id": row[0], "last_order_date": row[1]} for row in results]
return data
finally:
cursor.close()
connection.close()
def format_payload(data):
# Formats the data into the payload expected by the Batch API
return {
"identifiers": {
"custom_id": record["custom_id"],
}
"attributes": {
"last_order_date": record["last_order_date"].isoformat()
}
}
for record in data
]
}
def send_to_batch(payload):
# Sends the formatted data to the Batch API
headers = {
"Authorization": f"Bearer {BATCH_REST_KEY}",
"X-Project-Key": BATCH_PROJECT_KEY,
"Content-Type": "application/json"
}
response = requests.post(BATCH_API_URL, headers=headers,
data=json.dumps(payload))
if response.status_code == 202:
print("Successfully updated profiles in Batch.")
else:
print(f"Failed to update profiles: {response.status_code} {response.text}")
def main():
# Main workflow to query the stream, format data, and send it to Batch
print("Querying the Snowflake stream...")
data = query_stream()
if not data:
print("No new data to process.")
return
print("Formatting data for Batch API...")
payload = format_payload(data)
print("Sending data to Batch API...")
send_to_batch(payload)
if __name__ == "__main__":
main()
6. Package the script as a Container
Snowflake supports deploying custom Python scripts as containers using Snowpark Container Services.
Package the Python script as a container. Use tools like Docker to create a container image:
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "your_script.py"]Create a requirements.txt to include the dependencies:
snowflake-connector-python
requestsBuild the Container Image to run this command in the directory containing the Dockerfile:
docker build -t your_container_name .
Test the Container Locally to ensure it works by running:
docker run --rm your_container_name
Deploy the container to Snowflake using Snowpark Container Services. This allows the container to be triggered directly by Snowflake tasks.
7. Schedule execution with a Task
Tasks are scheduled jobs that live right inside Snowflake, and can be scheduled without the need to involve separate scheduling software.
Create a task to periodically trigger the Snowpark container. Schedule it according to your needs (e.g., every hour):
CREATE OR REPLACE TASK execute_batch_update
WAREHOUSE = your_warehouse
SCHEDULE = '1 HOUR'
AS
CALL SYSTEM$EXECUTE_CONTAINER('<container_image_reference>');
Start the task:
ALTER TASK execute_batch_update RESUME;
This task automatically runs every hour, invoking the container that performs the API call to Batch with the latest data.
8. Test and monitor the workflow
Simulate changes in the covered tables to ensure that the materialized view updates correctly, the stream captures the changes and the Python script correctly formats and sends the data to Batch.
Monitor Snowflake tasks and logs for execution status and debug issues if needed.
Log responses from the Profile API to ensure successful data transfers and handle errors.
You can use Batch’s Profile View to verify that the customer data has reached its destination, on the basis of a customer ID: How to find a user's profile on the dashboard?
You’re now ready to use your data to send personalised and engaging messages to your audience!