Skip to main content

Tower SDK

The Tower SDK provides helpful extensions to your Python code, including easy access to Apache Iceberg tables, LLMs, and orchestrating the execution of multiple apps.

You install the Tower SDK together with installing the Tower CLI.

pip install tower

To start using the Tower SDK, import it in your app code:

import tower

You don't need to use the SDK (import it into your app code) to run apps on Tower. If you don't want to access Iceberg tables or LLMs or orchestrate several apps in a control flow, you can skip the above 'import' step.

Tower Tables

Table

Table is a wrapper around an Iceberg table. It provides methods to read and write data to the table.

Table.read

Reads all data from the Iceberg table and returns it as a Polars DataFrame.

This method executes a full table scan and materializes the results into memory as a Polars DataFrame. For large tables, consider using to_polars() to get a LazyFrame that can be processed incrementally.

Returns:

  • pl.DataFrame: A Polars DataFrame containing all rows from the table.

Example:

table = tower.tables("my_table").load()
# Read all data into a DataFrame
df = table.read()
# Perform operations on the DataFrame
filtered_df = df.filter(pl.col("age") > 30)
# Get basic statistics
print(df.describe())

Table.to_polars

Converts the table to a Polars LazyFrame for efficient, lazy evaluation.

This method returns a LazyFrame that allows for building complex query plans without immediately executing them. This is particularly useful for:

  • Processing large tables that don't fit in memory
  • Building complex transformations and aggregations
  • Optimizing query performance through Polars' query optimizer

Returns:

  • pl.LazyFrame: A Polars LazyFrame representing the table data.

Example:

table = tower.tables("my_table").load()
# Create a lazy query plan
lazy_df = table.to_polars()
# Build complex transformations
result = (lazy_df
.filter(pl.col("age") > 30)
.groupby("department")
.agg(pl.col("salary").mean())
.sort("department"))
# Execute the plan
final_df = result.collect()

Table.rows_affected

Returns statistics about the number of rows affected by write operations on the table.

This method tracks the cumulative number of rows that have been inserted or updated through operations like insert() and upsert(). Note that delete operations are not currently tracked due to limitations in the underlying Iceberg implementation.

Returns:

  • RowsAffectedInformation: An object containing:
    • inserts (int): Total number of rows inserted
    • updates (int): Total number of rows updated

Example:

table = tower.tables("my_table").load()
# Insert some data
table.insert(new_data)
# Upsert some data
table.upsert(updated_data, join_cols=["id"])
# Check the impact of our operations
stats = table.rows_affected()
print(f"Inserted {stats.inserts} rows")
print(f"Updated {stats.updates} rows")

Table.insert

Inserts new rows into the Iceberg table.

This method appends the provided data to the table. The data must be provided as a PyArrow table with a schema that matches the table's schema. The operation is tracked in the table's statistics, incrementing the insert count.

Args:

  • data (pa.Table): The data to insert into the table. The schema of this table must match the schema of the target table.

Returns:

  • TTable: The table instance with the newly inserted rows, allowing for method chaining.

Example:

table = tower.tables("my_table").load()
# Create a PyArrow table with new data
new_data = pa.table({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"age": [25, 30, 35]
})
# Insert the data
table.insert(new_data)
# Verify the insertion
stats = table.rows_affected()
print(f"Inserted {stats.inserts} rows")

Table.upsert

Performs an upsert operation (update or insert) on the Iceberg table.

This method will:

  • Update existing rows if they match the join columns
  • Insert new rows if no match is found All operations are case-sensitive by default.

Args:

  • data (pa.Table): The data to upsert into the table. The schema of this table must match the schema of the target table.
  • join_cols (Optional[list[str]]): The columns that form the key to match rows on. If not provided, all columns will be used for matching.

Returns:

  • TTable: The table instance with the upserted rows, allowing for method chaining.

Note:

  • The operation is always case-sensitive
  • When a match is found, all columns are updated
  • When no match is found, the row is inserted
  • The operation is tracked in the table's statistics

Example:

table = tower.tables("my_table").load()
# Create a PyArrow table with data to upsert
data = pa.table({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"age": [26, 31, 36] # Updated ages
})
# Upsert the data using 'id' as the key
table.upsert(data, join_cols=["id"])
# Verify the operation
stats = table.rows_affected()
print(f"Updated {stats.updates} rows")
print(f"Inserted {stats.inserts} rows")

Table.delete

Deletes rows from the Iceberg table that match the specified filter conditions.

This method removes rows from the table based on the provided filter expressions. The operation is always case-sensitive. Note that the number of deleted rows cannot be tracked due to limitations in the underlying Iceberg implementation.

Args:

  • filters (Union[str, List[pc.Expression]]): The filter conditions to apply. Can be either:
    • A single PyArrow compute expression
    • A list of PyArrow compute expressions (combined with AND)
    • A string expression

Returns:

  • TTable: The table instance with the deleted rows, allowing for method chaining.

Note:

  • The operation is always case-sensitive
  • The number of deleted rows cannot be tracked in the table statistics
  • To get the number of deleted rows, you would need to compare snapshots

Example:

table = tower.tables("my_table").load()
# Delete rows where age is greater than 30
table.delete(table.column("age") > 30)
# Delete rows matching multiple conditions
table.delete([
table.column("age") > 30,
table.column("department") == "IT"
])
# Delete rows using a string expression
table.delete("age > 30 AND department = 'IT'")

Table.schema

Returns the schema of the table as a PyArrow schema.

This method converts the underlying Iceberg table schema into a PyArrow schema, which can be used for type information and schema validation.

Returns:

  • pa.Schema: The PyArrow schema representation of the table's structure.

Example:

table = tower.tables("my_table").load()
schema = table.schema()

Table.column

Returns a column from the table as a PyArrow compute expression.

This method is useful for creating column-based expressions that can be used in operations like filtering, sorting, or aggregating data. The returned expression can be used with PyArrow's compute functions.

Args:

  • name (str): The name of the column to retrieve from the table schema.

Returns:

  • pa.compute.Expression: A PyArrow compute expression representing the column.

Raises:

  • ValueError: If the specified column name is not found in the table schema.

Example:

table = tower.tables("my_table").load()
# Create a filter expression for rows where age > 30
age_expr = table.column("age") > 30
# Use the expression in a delete operation
table.delete(age_expr)

TableReference

TableReference.load

Loads an existing Iceberg table from the catalog.

This method resolves the table's namespace and name, then loads the table from the catalog. If the table doesn't exist, this will raise an error. Use create() or create_if_not_exists() to create new tables.

Returns:

  • Table: A new Table instance wrapping the loaded Iceberg table.

Raises:

  • TableNotFoundError: If the table doesn't exist in the catalog.

Example:

# Load the existing table
table = tower.tables("my_table", namespace="my_namespace").load()
# Now you can use the table
df = table.read()

TableReference.create

Creates a new Iceberg table with the specified schema.

This method will:

  1. Resolve the table's namespace (using default if not specified)
  2. Create the namespace if it doesn't exist
  3. Create a new table with the provided schema
  4. Return a Table instance for the newly created table

Args:

  • schema (pa.Schema): The PyArrow schema defining the structure of the table. This will be converted to an Iceberg schema internally.

Returns:

  • Table: A new Table instance wrapping the created Iceberg table.

Raises:

  • TableAlreadyExistsError: If a table with the same name already exists in the namespace.
  • NamespaceError: If there are issues creating or accessing the namespace.

Example:

# Define the table schema
schema = pa.schema([
pa.field("id", pa.int64()),
pa.field("name", pa.string()),
pa.field("age", pa.int32())
])
# Create the table
table = tower.tables("my_table", namespace="my_namespace").create(schema)
# Now you can use the table
table.insert(new_data)

TableReference.create_if_not_exists

Creates a new Iceberg table if it doesn't exist, or returns the existing table.

This method will:

  1. Resolve the table's namespace (using default if not specified)
  2. Create the namespace if it doesn't exist
  3. Create a new table with the provided schema if it doesn't exist
  4. Return the existing table if it already exists
  5. Return a Table instance for the table

Unlike create(), this method will not raise an error if the table already exists. Instead, it will return the existing table, making it safe for idempotent operations.

Args:

  • schema (pa.Schema): The PyArrow schema defining the structure of the table. This will be converted to an Iceberg schema internally. Note that this schema is only used if the table needs to be created.

Returns:

  • Table: A Table instance wrapping either the newly created or existing Iceberg table.

Raises:

  • NamespaceError: If there are issues creating or accessing the namespace.

Example:

# Define the table schema
schema = pa.schema([
pa.field("id", pa.int64()),
pa.field("name", pa.string()),
pa.field("age", pa.int32())
])
# Create the table if it doesn't exist
table = tower.tables("my_table", namespace="my_namespace").create_if_not_exists(schema)
# This is safe to call multiple times
table = tower.tables("my_table", namespace="my_namespace").create_if_not_exists(schema)

tables

Creates a reference to an Iceberg table that can be used to load or create tables.

This function is the main entry point for working with Iceberg tables in Tower. It returns a TableReference object that can be used to either load an existing table or create a new one. The actual table operations (read, write, etc.) are performed through the Table instance obtained by calling load() or create() on the returned reference.

Args:

  • name (str): The name of the table to reference. This will be used to either load an existing table or create a new one.
  • catalog (Union[str, Catalog], optional): The catalog to use. Can be either:
    • A string name of the catalog (defaults to "default")
    • A Catalog instance (useful for testing or custom catalog implementations) Defaults to "default".
  • namespace (Optional[str], optional): The namespace in which the table exists or should be created. If not provided, a default namespace will be used.

Returns:

  • TableReference: A reference object that can be used to:
    • Load an existing table using load()
    • Create a new table using create()
    • Create a table if it doesn't exist using create_if_not_exists()

Raises:

  • CatalogError: If there are issues accessing or loading the specified catalog.
  • TableNotFoundError: When trying to load a non-existent table (only if load() is called).

Examples:

# Load an existing table from the default catalog
table = tower.tables("my_table").load()
df = table.read()

# Create a new table in a specific namespace
schema = pa.schema([
pa.field("id", pa.int64()),
pa.field("name", pa.string())
])
table = tower.tables("new_table", namespace="my_namespace").create(schema)

# Use a specific catalog
table = tower.tables("my_table", catalog="my_catalog").load()

# Create a table if it doesn't exist
table = tower.tables("my_table").create_if_not_exists(schema)

Orchestration

run_app

Runs a Tower application with specified parameters and environment.

This function initiates a new run of a Tower application identified by its slug. The run can be configured with an optional environment override and runtime parameters. If no environment is specified, the default environment from the Tower context is used.

Args:

  • slug (str): The unique identifier of the application to run.
  • environment (Optional[str]): The environment to run the application in. If not provided, uses the default environment from the Tower context.
  • parameters (Optional[Dict[str, str]]): A dictionary of key-value pairs to pass as parameters to the application run.

Returns:

  • Run: A Run object containing information about the initiated application run, including the app_slug and run number.

Raises:

  • RuntimeError: If there is an error initiating the run or if the Tower API returns an error response.

Examples:

# Run an app with default environment
run = tower.run_app("my-app")

# Run an app with custom environment and parameters
params = {"input_file": "data.csv", "output_dir": "results"}
run = tower.run_app("my-app", environment="prod", parameters=params)

wait_for_run

Waits for a Tower app run to reach a terminal state by polling the Tower API.

This function continuously polls the Tower API every 2 seconds (defined by WAIT_TIMEOUT) to check the status of the specified run. The function returns when the run reaches a terminal state (exited, errored, cancelled, or crashed).

Args:

  • run (Run): The Run object containing the app_slug and number of the run to monitor.
  • timeout (Optional[float]): Maximum time to wait in seconds before raising a TimeoutException. Defaults to one day (86,400 seconds).
  • raise_on_failure (bool): If True, raises a RunFailedError when the run fails. If False, returns the failed run object. Defaults to False.

Returns:

  • Run: The final state of the run after completion or failure.

Raises:

  • TimeoutException: If the specified timeout is reached before the run completes.
  • RunFailedError: If raise_on_failure is True and the run fails.
  • UnhandledRunStateException: If the run enters an unexpected state.
  • UnknownException: If there are persistent problems communicating with the Tower API.
  • NotFoundException: If the run cannot be found.
  • UnauthorizedException: If the API key is invalid or unauthorized.

Examples:

# Wait for a run to complete with default settings
run = tower.run_app("my-app")
final_run = tower.wait_for_run(run)

# Wait for a run with custom timeout and failure handling
run = tower.run_app("my-app")
try:
final_run = tower.wait_for_run(run, timeout=3600, raise_on_failure=True)
except RunFailedError as e:
print(f"Run failed: {e}")

wait_for_runs

Waits for multiple Tower app runs to reach terminal states by polling the Tower API.

This function continuously polls the Tower API every 2 seconds (defined by WAIT_TIMEOUT) to check the status of all specified runs. The function returns when all runs reach terminal states (exited, errored, cancelled, or crashed).

Args:

  • runs (List[Run]): A list of Run objects to monitor.
  • timeout (Optional[float]): Maximum time to wait in seconds before raising a TimeoutException. Defaults to one day (86,400 seconds).
  • raise_on_failure (bool): If True, raises a RunFailedError when any run fails. If False, failed runs are returned in the failed_runs list. Defaults to False.

Returns:

  • tuple[List[Run], List[Run]]: A tuple containing two lists:
    • successful_runs: List of runs that completed successfully (status: 'exited')
    • failed_runs: List of runs that failed (status: 'crashed', 'cancelled', or 'errored')

Raises:

  • TimeoutException: If the specified timeout is reached before all runs complete.
  • RunFailedError: If raise_on_failure is True and any run fails.
  • UnhandledRunStateException: If a run enters an unexpected state.
  • UnknownException: If there are persistent problems communicating with the Tower API.
  • NotFoundException: If any run cannot be found.
  • UnauthorizedException: If the API key is invalid or unauthorized.

Examples:

# Wait for multiple runs to complete
runs = [tower.run_app("app1"), tower.run_app("app2")]
successful, failed = tower.wait_for_runs(runs)

# Wait for multiple runs with custom settings
runs = [tower.run_app("app1"), tower.run_app("app2")]
try:
successful, failed = tower.wait_for_runs(runs, timeout=3600, raise_on_failure=True)
except RunFailedError as e:
print(f"One or more runs failed: {e}")