Tower SDK
The Tower SDK provides helpful extensions to your Python code, including easy access to Apache Iceberg tables, LLMs, and orchestrating the execution of multiple apps.
You install the Tower SDK together with installing the Tower CLI.
pip install tower
To start using the Tower SDK, import it in your app code:
import tower
You don't need to use the SDK (import it into your app code) to run apps on Tower. If you don't want to access Iceberg tables or LLMs or orchestrate several apps in a control flow, you can skip the above 'import' step.
Tower Tables
Table
Table
is a wrapper around an Iceberg table. It provides methods to read and write data to the table.
Table.read
Reads all data from the Iceberg table and returns it as a Polars DataFrame.
This method executes a full table scan and materializes the results into memory as a Polars DataFrame. For large tables, consider using to_polars()
to get a LazyFrame that can be processed incrementally.
Returns:
pl.DataFrame
: A Polars DataFrame containing all rows from the table.
Example:
table = tower.tables("my_table").load()
# Read all data into a DataFrame
df = table.read()
# Perform operations on the DataFrame
filtered_df = df.filter(pl.col("age") > 30)
# Get basic statistics
print(df.describe())
Table.to_polars
Converts the table to a Polars LazyFrame for efficient, lazy evaluation.
This method returns a LazyFrame that allows for building complex query plans without immediately executing them. This is particularly useful for:
- Processing large tables that don't fit in memory
- Building complex transformations and aggregations
- Optimizing query performance through Polars' query optimizer
Returns:
pl.LazyFrame
: A Polars LazyFrame representing the table data.
Example:
table = tower.tables("my_table").load()
# Create a lazy query plan
lazy_df = table.to_polars()
# Build complex transformations
result = (lazy_df
.filter(pl.col("age") > 30)
.groupby("department")
.agg(pl.col("salary").mean())
.sort("department"))
# Execute the plan
final_df = result.collect()
Table.rows_affected
Returns statistics about the number of rows affected by write operations on the table.
This method tracks the cumulative number of rows that have been inserted or updated through operations like insert()
and upsert()
. Note that delete operations are not currently tracked due to limitations in the underlying Iceberg implementation.
Returns:
RowsAffectedInformation
: An object containing:inserts
(int): Total number of rows insertedupdates
(int): Total number of rows updated
Example:
table = tower.tables("my_table").load()
# Insert some data
table.insert(new_data)
# Upsert some data
table.upsert(updated_data, join_cols=["id"])
# Check the impact of our operations
stats = table.rows_affected()
print(f"Inserted {stats.inserts} rows")
print(f"Updated {stats.updates} rows")
Table.insert
Inserts new rows into the Iceberg table.
This method appends the provided data to the table. The data must be provided as a PyArrow table with a schema that matches the table's schema. The operation is tracked in the table's statistics, incrementing the insert count.
Args:
data
(pa.Table): The data to insert into the table. The schema of this table must match the schema of the target table.
Returns:
TTable
: The table instance with the newly inserted rows, allowing for method chaining.
Example:
table = tower.tables("my_table").load()
# Create a PyArrow table with new data
new_data = pa.table({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"age": [25, 30, 35]
})
# Insert the data
table.insert(new_data)
# Verify the insertion
stats = table.rows_affected()
print(f"Inserted {stats.inserts} rows")
Table.upsert
Performs an upsert operation (update or insert) on the Iceberg table.
This method will:
- Update existing rows if they match the join columns
- Insert new rows if no match is found All operations are case-sensitive by default.
Args:
data
(pa.Table): The data to upsert into the table. The schema of this table must match the schema of the target table.join_cols
(Optional[list[str]]): The columns that form the key to match rows on. If not provided, all columns will be used for matching.
Returns:
TTable
: The table instance with the upserted rows, allowing for method chaining.
Note:
- The operation is always case-sensitive
- When a match is found, all columns are updated
- When no match is found, the row is inserted
- The operation is tracked in the table's statistics
Example:
table = tower.tables("my_table").load()
# Create a PyArrow table with data to upsert
data = pa.table({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"age": [26, 31, 36] # Updated ages
})
# Upsert the data using 'id' as the key
table.upsert(data, join_cols=["id"])
# Verify the operation
stats = table.rows_affected()
print(f"Updated {stats.updates} rows")
print(f"Inserted {stats.inserts} rows")
Table.delete
Deletes rows from the Iceberg table that match the specified filter conditions.
This method removes rows from the table based on the provided filter expressions. The operation is always case-sensitive. Note that the number of deleted rows cannot be tracked due to limitations in the underlying Iceberg implementation.
Args:
filters
(Union[str, List[pc.Expression]]): The filter conditions to apply. Can be either:- A single PyArrow compute expression
- A list of PyArrow compute expressions (combined with AND)
- A string expression
Returns:
TTable
: The table instance with the deleted rows, allowing for method chaining.
Note:
- The operation is always case-sensitive
- The number of deleted rows cannot be tracked in the table statistics
- To get the number of deleted rows, you would need to compare snapshots
Example:
table = tower.tables("my_table").load()
# Delete rows where age is greater than 30
table.delete(table.column("age") > 30)
# Delete rows matching multiple conditions
table.delete([
table.column("age") > 30,
table.column("department") == "IT"
])
# Delete rows using a string expression
table.delete("age > 30 AND department = 'IT'")
Table.schema
Returns the schema of the table as a PyArrow schema.
This method converts the underlying Iceberg table schema into a PyArrow schema, which can be used for type information and schema validation.
Returns:
pa.Schema
: The PyArrow schema representation of the table's structure.
Example:
table = tower.tables("my_table").load()
schema = table.schema()
Table.column
Returns a column from the table as a PyArrow compute expression.
This method is useful for creating column-based expressions that can be used in operations like filtering, sorting, or aggregating data. The returned expression can be used with PyArrow's compute functions.
Args:
name
(str): The name of the column to retrieve from the table schema.
Returns:
pa.compute.Expression
: A PyArrow compute expression representing the column.
Raises:
ValueError
: If the specified column name is not found in the table schema.
Example:
table = tower.tables("my_table").load()
# Create a filter expression for rows where age > 30
age_expr = table.column("age") > 30
# Use the expression in a delete operation
table.delete(age_expr)
TableReference
TableReference.load
Loads an existing Iceberg table from the catalog.
This method resolves the table's namespace and name, then loads the table from the catalog. If the table doesn't exist, this will raise an error. Use create()
or create_if_not_exists()
to create new tables.
Returns:
Table
: A new Table instance wrapping the loaded Iceberg table.
Raises:
TableNotFoundError
: If the table doesn't exist in the catalog.
Example:
# Load the existing table
table = tower.tables("my_table", namespace="my_namespace").load()
# Now you can use the table
df = table.read()
TableReference.create
Creates a new Iceberg table with the specified schema.
This method will:
- Resolve the table's namespace (using default if not specified)
- Create the namespace if it doesn't exist
- Create a new table with the provided schema
- Return a Table instance for the newly created table
Args:
schema
(pa.Schema): The PyArrow schema defining the structure of the table. This will be converted to an Iceberg schema internally.
Returns:
Table
: A new Table instance wrapping the created Iceberg table.
Raises:
TableAlreadyExistsError
: If a table with the same name already exists in the namespace.NamespaceError
: If there are issues creating or accessing the namespace.
Example:
# Define the table schema
schema = pa.schema([
pa.field("id", pa.int64()),
pa.field("name", pa.string()),
pa.field("age", pa.int32())
])
# Create the table
table = tower.tables("my_table", namespace="my_namespace").create(schema)
# Now you can use the table
table.insert(new_data)
TableReference.create_if_not_exists
Creates a new Iceberg table if it doesn't exist, or returns the existing table.
This method will:
- Resolve the table's namespace (using default if not specified)
- Create the namespace if it doesn't exist
- Create a new table with the provided schema if it doesn't exist
- Return the existing table if it already exists
- Return a Table instance for the table
Unlike create()
, this method will not raise an error if the table already exists. Instead, it will return the existing table, making it safe for idempotent operations.
Args:
schema
(pa.Schema): The PyArrow schema defining the structure of the table. This will be converted to an Iceberg schema internally. Note that this schema is only used if the table needs to be created.
Returns:
Table
: A Table instance wrapping either the newly created or existing Iceberg table.
Raises:
NamespaceError
: If there are issues creating or accessing the namespace.
Example:
# Define the table schema
schema = pa.schema([
pa.field("id", pa.int64()),
pa.field("name", pa.string()),
pa.field("age", pa.int32())
])
# Create the table if it doesn't exist
table = tower.tables("my_table", namespace="my_namespace").create_if_not_exists(schema)
# This is safe to call multiple times
table = tower.tables("my_table", namespace="my_namespace").create_if_not_exists(schema)
tables
Creates a reference to an Iceberg table that can be used to load or create tables.
This function is the main entry point for working with Iceberg tables in Tower. It returns a TableReference object that can be used to either load an existing table or create a new one. The actual table operations (read, write, etc.) are performed through the Table instance obtained by calling load()
or create()
on the returned reference.
Args:
name
(str): The name of the table to reference. This will be used to either load an existing table or create a new one.catalog
(Union[str, Catalog], optional): The catalog to use. Can be either:- A string name of the catalog (defaults to "default")
- A Catalog instance (useful for testing or custom catalog implementations) Defaults to "default".
namespace
(Optional[str], optional): The namespace in which the table exists or should be created. If not provided, a default namespace will be used.
Returns:
TableReference
: A reference object that can be used to:- Load an existing table using
load()
- Create a new table using
create()
- Create a table if it doesn't exist using
create_if_not_exists()
- Load an existing table using
Raises:
CatalogError
: If there are issues accessing or loading the specified catalog.TableNotFoundError
: When trying to load a non-existent table (only ifload()
is called).
Examples:
# Load an existing table from the default catalog
table = tower.tables("my_table").load()
df = table.read()
# Create a new table in a specific namespace
schema = pa.schema([
pa.field("id", pa.int64()),
pa.field("name", pa.string())
])
table = tower.tables("new_table", namespace="my_namespace").create(schema)
# Use a specific catalog
table = tower.tables("my_table", catalog="my_catalog").load()
# Create a table if it doesn't exist
table = tower.tables("my_table").create_if_not_exists(schema)
Large Language Models
Llm
A language model interface for the Tower system.
This class provides a unified interface for interacting with language models through different inference providers (e.g. Ollama for local inference, Hugging Face Hub for remote). It abstracts away model name resolution, inference provider selection, and local/remote inference API differences to provide a consistent interface for text generation tasks.
The class supports both chat-based interactions (similar to OpenAI Chat Completions API) and simple prompt-based interactions (similar to legacy OpenAI Completions API).
This class is typically instantiated through the llms()
factory function rather than directly.
Examples:
# Create an Llm instance (typically done via the llms() factory function)
llm = tower.llms("llama3.2", max_tokens=1000)
# Use for chat completions
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello!"}
]
response = llm.complete_chat(messages)
# Use for simple prompts
response = llm.prompt("What is the capital of France?")
Llm.complete_chat
Mimics the OpenAI Chat Completions API by sending a list of messages to the language model and returning the generated response.
This function provides a unified interface for chat-based interactions with different language model providers (Ollama, Hugging Face Hub, etc.) while maintaining compatibility with the OpenAI Chat Completions API format.
Args:
messages
(List): A list of message dictionaries, each containing 'role' and 'content' keys. Follows the OpenAI Chat Completions API message format.
Returns:
str
: The generated response from the language model.
Examples:
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello, how are you?"}
]
response = llm.complete_chat(messages)
Llm.prompt
Mimics the old-style OpenAI Completions API by sending a single prompt string to the language model and returning the generated response.
This function provides a simple interface for single-prompt interactions, similar to the legacy OpenAI /v1/completions endpoint. It internally converts the prompt to a chat message format and uses the complete_chat method.
Args:
prompt
(str): A single string containing the prompt to send to the language model.
Returns:
str
: The generated response from the language model.
Examples:
response = llm.prompt("What is the capital of France?")
llms
Creates a language model instance for the Tower system.
This factory function creates an Llm instance configured with the specified model parameters. It automatically resolves the model name based on the available inference providers (Ollama for local inference, Hugging Face Hub for remote). The max_tokens parameter is used to set the maximum number of tokens to generate in responses.
Args:
model_name
(str): Can be a model family name (e.g., "llama3.2", "gemma3.2", "deepseek-r1") or a specific model identifier (e.g., "deepseek-r1:14b", "deepseek-ai/DeepSeek-R1-0528"). The function will automatically resolve the exact model name based on available models in the configured inference provider.max_tokens
(int): Maximum number of tokens to generate in responses. Defaults to 1000.
Returns:
Llm
: A configured language model instance that can be used for text generation, chat completions, and other language model interactions.
Raises:
ValueError
: If the configured inference router is not supported or if the model cannot be resolved.
Examples:
# Create a language model instance
llm = tower.llms("llama3.2", max_tokens=500)
# Use for chat completions
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello!"}
]
response = llm.complete_chat(messages)
# Use for simple prompts
response = llm.prompt("What is the capital of France?")
Orchestration
run_app
Runs a Tower application with specified parameters and environment.
This function initiates a new run of a Tower application identified by its slug. The run can be configured with an optional environment override and runtime parameters. If no environment is specified, the default environment from the Tower context is used.
Args:
slug
(str): The unique identifier of the application to run.environment
(Optional[str]): The environment to run the application in. If not provided, uses the default environment from the Tower context.parameters
(Optional[Dict[str, str]]): A dictionary of key-value pairs to pass as parameters to the application run.
Returns:
Run
: A Run object containing information about the initiated application run, including the app_slug and run number.
Raises:
RuntimeError
: If there is an error initiating the run or if the Tower API returns an error response.
Examples:
# Run an app with default environment
run = tower.run_app("my-app")
# Run an app with custom environment and parameters
params = {"input_file": "data.csv", "output_dir": "results"}
run = tower.run_app("my-app", environment="prod", parameters=params)
wait_for_run
Waits for a Tower app run to reach a terminal state by polling the Tower API.
This function continuously polls the Tower API every 2 seconds (defined by WAIT_TIMEOUT) to check the status of the specified run. The function returns when the run reaches a terminal state (exited, errored, cancelled, or crashed).
Args:
run
(Run): The Run object containing the app_slug and number of the run to monitor.timeout
(Optional[float]): Maximum time to wait in seconds before raising a TimeoutException. Defaults to one day (86,400 seconds).raise_on_failure
(bool): If True, raises a RunFailedError when the run fails. If False, returns the failed run object. Defaults to False.
Returns:
Run
: The final state of the run after completion or failure.
Raises:
TimeoutException
: If the specified timeout is reached before the run completes.RunFailedError
: If raise_on_failure is True and the run fails.UnhandledRunStateException
: If the run enters an unexpected state.UnknownException
: If there are persistent problems communicating with the Tower API.NotFoundException
: If the run cannot be found.UnauthorizedException
: If the API key is invalid or unauthorized.
Examples:
# Wait for a run to complete with default settings
run = tower.run_app("my-app")
final_run = tower.wait_for_run(run)
# Wait for a run with custom timeout and failure handling
run = tower.run_app("my-app")
try:
final_run = tower.wait_for_run(run, timeout=3600, raise_on_failure=True)
except RunFailedError as e:
print(f"Run failed: {e}")
wait_for_runs
Waits for multiple Tower app runs to reach terminal states by polling the Tower API.
This function continuously polls the Tower API every 2 seconds (defined by WAIT_TIMEOUT) to check the status of all specified runs. The function returns when all runs reach terminal states (exited
, errored
, cancelled
, or crashed
).
Args:
runs
(List[Run]): A list of Run objects to monitor.timeout
(Optional[float]): Maximum time to wait in seconds before raising a TimeoutException. Defaults to one day (86,400 seconds).raise_on_failure
(bool): If True, raises a RunFailedError when any run fails. If False, failed runs are returned in the failed_runs list. Defaults to False.
Returns:
tuple[List[Run], List[Run]]
: A tuple containing two lists:successful_runs
: List of runs that completed successfully (status: 'exited')failed_runs
: List of runs that failed (status: 'crashed', 'cancelled', or 'errored')
Raises:
TimeoutException
: If the specified timeout is reached before all runs complete.RunFailedError
: If raise_on_failure is True and any run fails.UnhandledRunStateException
: If a run enters an unexpected state.UnknownException
: If there are persistent problems communicating with the Tower API.NotFoundException
: If any run cannot be found.UnauthorizedException
: If the API key is invalid or unauthorized.
Examples:
# Wait for multiple runs to complete
runs = [tower.run_app("app1"), tower.run_app("app2")]
successful, failed = tower.wait_for_runs(runs)
# Wait for multiple runs with custom settings
runs = [tower.run_app("app1"), tower.run_app("app2")]
try:
successful, failed = tower.wait_for_runs(runs, timeout=3600, raise_on_failure=True)
except RunFailedError as e:
print(f"One or more runs failed: {e}")