Working with Tables
This guide demonstrates Tower's table capabilities using two example apps:
- Writing Ticker Data to Iceberg - Shows how to create and write to tables
- Trimming Ticker Table - Shows how to read and delete data from tables
Creating and Writing to Tables
The first example app uses Yahoo Finance's public API and the yfinance library to:
- Download major indicators for stock tickers (e.g., 'AAPL')
- Track Open, Close, and Volume data
- Save the data in an Iceberg table named "daily_ticker_data"
Creating Tables
To create a table, follow these steps:
- Define the schema in Arrow Schema format
- Call the
create_if_not_exists()
method
SCHEMA = pa.schema([
("ticker", pa.string()),
("date", pa.string()),
("open", pa.float64()),
("close", pa.float64()),
("volume", pa.int64()),
])
table = tower
.tables("daily_ticker_data")
.create_if_not_exists(SCHEMA)
Using Different Catalogs and Namespaces
To create a table in a specific catalog and namespace:
mytable = tower
.tables('mytable', catalog='mycatalog', namespace='mynamespace')
.create_if_not_exists(SCHEMA)
If you're certain the table doesn't exist, use the create()
method:
mytable = tower
.tables('mytable', catalog='mycatalog', namespace='mynamespace')
.create(SCHEMA)
Writing to Tables
To write data to a table:
- Get a reference to the table using
load()
- Use either
upsert()
orinsert()
- Provide data as an Arrow Table
# Get table reference
table = tower.tables("daily_ticker_data").load()
Upserting Data
Use upsert()
to:
- Update existing rows that match join columns
- Insert new rows that don't have matches
table = table.upsert(data, join_cols=['ticker','date'])
Inserting Data
Use insert()
to simply add new rows:
table = table.insert(data)
Reading and Deleting Data
The second example app demonstrates how to:
- Inspect an Iceberg table
- Remove records older than a specified time window
Reading Data
Lazy Reading (Recommended)
For best performance and memory efficiency:
- Create a Polars LazyFrame using
to_polars()
- Build your query plan
- Execute with
collect()
when needed
# Create LazyFrame
df = table.to_polars()
# Build and execute query
ticker_stats = df.group_by("ticker").agg([
pl.count().alias("row_count"),
pl.col("date").str.to_date().max().alias("latest_date")
]).collect()
Eager Reading
For smaller tables or when you need all data in memory:
# Read entire table into memory
df = table.read()
Deleting Data
Use the delete()
method with a predicate:
cutoff_date_str = calculate_cutoff_date(df, time_window_days)
table.delete(f"date < '{cutoff_date_str}'")
You can provide predicates as:
- String expressions
- Lists of Arrow Expressions