#!/usr/bin/env python
import argparse
import subprocess
import tempfile
import warnings
from pathlib import Path
from types import MethodType
import matplotlib
import matplotlib.dates as mdates
import numpy as np
from matplotlib import font_manager
from matplotlib.animation import FuncAnimation
from matplotlib.figure import Figure
from matplotlib.gridspec import GridSpec
from matplotlib.offsetbox import AnchoredText
from matplotlib.ticker import ScalarFormatter
from obspy import UTCDateTime
from obspy.clients.fdsn import RoutingClient
from obspy.clients.fdsn.client import raise_on_error
from scipy import signal
from tqdm import tqdm
from . import __version__
# Add Tex Gyre Heros and JetBrains Mono to Matplotlib
for font_path in font_manager.findSystemFonts(
str(Path(__file__).resolve().parent / 'fonts')
):
font_manager.fontManager.addfont(font_path)
LOWEST_AUDIBLE_FREQUENCY = 20 # [Hz]
HIGHEST_AUDIBLE_FREQUENCY = 20000 # [Hz]
AUDIO_SAMPLE_RATE = 44100 # [Hz]
PAD = 60 # [s] Extra data to download on either side of requested time slice
# [px] Output video resolution options (width, height)
RESOLUTIONS = {
'crude': (640, 360),
'720p': (1280, 720),
'1080p': (1920, 1080),
'2K': (2560, 1440),
'4K': (3840, 2160),
}
FIGURE_WIDTH = 7.7 # [in] Sets effective font size, basically
# For spectrograms
REFERENCE_PRESSURE = 20e-6 # [Pa]
REFERENCE_VELOCITY = 1 # [m/s]
MS_PER_S = 1000 # [ms/s]
# Colorbar extension triangle height as proportion of colorbar length
EXTENDFRAC = 0.04
[docs]
def sonify(
network,
station,
channel,
starttime,
endtime,
location='*',
freqmin=None,
freqmax=None,
speed_up_factor=200,
fps=1,
resolution='4K',
output_dir=None,
spec_win_dur=5,
db_lim='smart',
log=False,
utc_offset=None,
):
r"""
Produce an animated spectrogram with a soundtrack derived from sped-up
seismic or infrasound data.
Args:
network (str): SEED network code
station (str): SEED station code
channel (str): SEED channel code
starttime (:class:`~obspy.core.utcdatetime.UTCDateTime`): Start time of
animation (UTC)
endtime (:class:`~obspy.core.utcdatetime.UTCDateTime`): End time of
animation (UTC)
location (str): SEED location code
freqmin (int or float): Lower bandpass corner [Hz] (defaults to 20 Hz /
`speed_up_factor`)
freqmax (int or float): Upper bandpass corner [Hz] (defaults to 20,000
Hz / `speed_up_factor` or the `Nyquist frequency`_, whichever is
smaller)
speed_up_factor (int): Factor by which to speed up the waveform data
(higher values = higher pitches)
fps (int): Frames per second of output video
resolution (str): Resolution of output video; one of `'crude'` (640
:math:`\times` 360), `'720p'` (1280 :math:`\times` 720), `'1080p'`
(1920 :math:`\times` 1080), `'2K'` (2560 :math:`\times` 1440), or
`'4K'` (3840 :math:`\times` 2160)
output_dir (str or :class:`~pathlib.Path`): Directory where output video
should be saved (defaults to :meth:`~pathlib.Path.cwd`)
spec_win_dur (int or float): Duration of spectrogram window [s]
db_lim (tuple or str): Tuple defining min and max colormap cutoffs [dB],
`'smart'` for a sensible automatic choice, or `None` for no clipping
log (bool): If `True`, use log scaling for :math:`y`-axis of spectrogram
utc_offset (int or float): If not `None`, convert UTC time to local time
using this offset [hours] before plotting
.. _Nyquist frequency: https://en.wikipedia.org/wiki/Nyquist_frequency
"""
# Capture args and format as string to store in movie metadata
key_value_pairs = [f'{k}={repr(v)}' for k, v in locals().items()]
call_str = 'sonify({})'.format(', '.join(key_value_pairs))
# Use current working directory if none provided
if not output_dir:
output_dir = Path().cwd()
output_dir = Path(str(output_dir)).expanduser().resolve()
if not output_dir.is_dir():
raise FileNotFoundError(f'Directory {output_dir} does not exist!')
# See https://service.iris.edu/irisws/fedcatalog/1/datacenters?format=html
client = RoutingClient('iris-federator')
print('Retrieving data...')
st = client.get_waveforms(
network=network,
station=station,
location=location,
channel=channel,
starttime=starttime - PAD,
endtime=endtime + PAD,
)
if not st:
raise_on_error(204, None) # If Stream is empty, then raise FDSNNoDataException
print('Done')
# Merge Traces with the same IDs
st.merge(fill_value='interpolate')
if st.count() != 1:
warnings.warn('Stream contains more than one Trace. Using first entry!')
for tr in st:
print(tr.id)
tr = st[0]
# Now that we have just one Trace, get inventory (which has response info)
inv = client.get_stations(
network=tr.stats.network,
station=tr.stats.station,
location=tr.stats.location,
channel=tr.stats.channel,
starttime=tr.stats.starttime,
endtime=tr.stats.endtime,
level='response',
)
# Adjust starttime so we have nice numbers in time box (carefully!)
offset = np.abs(tr.stats.starttime - (starttime - PAD)) # [s]
if offset > tr.stats.delta:
warnings.warn(
f'Difference between requested and actual starttime is {offset} s, '
f'which is larger than the data sample interval ({tr.stats.delta} s). '
'Not adjusting starttime of downloaded data; beware of inaccurate timing!'
)
else:
tr.stats.starttime = starttime - PAD
# Apply UTC offset if provided
if utc_offset is not None:
signed_offset = f'{utc_offset:{"+" if utc_offset else ""}g}'
print(f'Converting to local time using UTC offset of {signed_offset} hours')
utc_offset_sec = utc_offset * mdates.SEC_PER_HOUR
starttime += utc_offset_sec
endtime += utc_offset_sec
tr.stats.starttime += utc_offset_sec
# All infrasound sensors have a "?DF" channel pattern
if tr.stats.channel[1:3] == 'DF':
is_infrasound = True
rescale = 1 # No conversion
# All high-gain seismometers have a "?H?" channel pattern
elif tr.stats.channel[1] == 'H':
is_infrasound = False
rescale = 1e6 # Convert m to µm
# We can't figure out what type of sensor this is...
else:
raise ValueError(
f'Channel {tr.stats.channel} is not an infrasound or seismic channel!'
)
if not freqmax:
freqmax = np.min(
[tr.stats.sampling_rate / 2, HIGHEST_AUDIBLE_FREQUENCY / speed_up_factor]
)
if not freqmin:
freqmin = LOWEST_AUDIBLE_FREQUENCY / speed_up_factor
tr.remove_response(inventory=inv) # Units are m/s OR Pa after response removal
tr.detrend('demean')
tr.taper(max_percentage=None, max_length=PAD / 2) # Taper away some of PAD
print(f'Applying {freqmin:g}–{freqmax:g} Hz bandpass')
tr.filter('bandpass', freqmin=freqmin, freqmax=freqmax, zerophase=True)
# Make trimmed version
tr_trim = tr.copy()
tr_trim.trim(starttime, endtime)
# Create temporary directory for audio and video files
temp_dir = tempfile.TemporaryDirectory()
# MAKE AUDIO FILE
tr_audio = tr_trim.copy()
target_fs = AUDIO_SAMPLE_RATE / speed_up_factor
corner_freq = 0.4 * target_fs # [Hz] Note that Nyquist is 0.5 * target_fs
if corner_freq < tr_audio.stats.sampling_rate / 2: # To avoid ValueError
tr_audio.filter('lowpass', freq=corner_freq, corners=10, zerophase=True)
tr_audio.interpolate(sampling_rate=target_fs, method='lanczos', a=20)
tr_audio.taper(0.01) # For smooth start and end
audio_file = Path(temp_dir.name) / '47.wav'
print('Saving audio file...')
tr_audio.write(
str(audio_file),
format='WAV',
width=4,
rescale=True,
framerate=AUDIO_SAMPLE_RATE,
)
print('Done')
# MAKE VIDEO FILE
# We don't need an anti-aliasing filter here since we never use the values,
# just the timestamps
timing_tr = tr_trim.copy().interpolate(sampling_rate=fps / speed_up_factor)
times = timing_tr.times('UTCDateTime')[:-1] # Remove extra frame
# Define update function
def _march_forward(frame, spec_line, wf_line, time_box, wf_progress):
spec_line.set_xdata([times[frame].matplotlib_date])
wf_line.set_xdata([times[frame].matplotlib_date])
time_box.txt.set_text(times[frame].strftime('%H:%M:%S'))
tr_progress = tr.copy().trim(endtime=times[frame])
wf_progress.set_xdata(tr_progress.times('matplotlib'))
wf_progress.set_ydata(tr_progress.data * rescale)
# Store user's rc settings, then update font stuff
original_params = matplotlib.rcParams.copy()
matplotlib.rcParams.update(matplotlib.rcParamsDefault)
matplotlib.rcParams['font.sans-serif'] = 'Tex Gyre Heros'
matplotlib.rcParams['mathtext.fontset'] = 'custom'
fig, *fargs = _spectrogram(
tr,
starttime,
endtime,
is_infrasound,
rescale,
spec_win_dur,
db_lim,
(freqmin, freqmax),
log,
utc_offset is not None,
resolution,
)
# Create animation
interval = ((1 / timing_tr.stats.sampling_rate) * MS_PER_S) / speed_up_factor
frames_tqdm = tqdm(
np.arange(times.size),
initial=1, # Frames start at 1
bar_format='{percentage:3.0f}% |{bar}| {n_fmt}/{total_fmt} frames ',
)
animation = FuncAnimation(
fig,
func=_march_forward,
frames=frames_tqdm,
fargs=fargs,
interval=interval,
)
video_file = Path(temp_dir.name) / '47.mp4'
tqdm.write('Saving animation. This may take a while...')
animation.save(
video_file,
dpi=RESOLUTIONS[resolution][0] / FIGURE_WIDTH, # Can be a float...
)
frames_tqdm.close()
print('Done')
# Restore user's rc settings, ignoring Matplotlib deprecation warnings
with warnings.catch_warnings():
warnings.simplefilter('ignore')
matplotlib.rcParams.update(original_params)
# MAKE COMBINED FILE
tr_id_str = '_'.join([code for code in tr.id.split('.') if code])
output_file = output_dir / f'{tr_id_str}_{speed_up_factor}x.mp4'
_ffmpeg_combine(audio_file, video_file, output_file, call_str)
# Clean up temporary directory, just to be safe
temp_dir.cleanup()
def _spectrogram(
tr,
starttime,
endtime,
is_infrasound,
rescale,
spec_win_dur,
db_lim,
freq_lim,
log,
is_local_time,
resolution,
):
"""
Make a combination waveform and spectrogram plot for an infrasound or
seismic signal.
Args:
tr (:class:`~obspy.core.trace.Trace`): Input data, usually starts
before `starttime` and ends after `endtime` (this function expects
the response to be removed!)
starttime (:class:`~obspy.core.utcdatetime.UTCDateTime`): Start time
endtime (:class:`~obspy.core.utcdatetime.UTCDateTime`): End time
is_infrasound (bool): `True` if infrasound, `False` if seismic
rescale (int or float): Scale waveforms by this factor for plotting
spec_win_dur (int or float): See docstring for :func:`~sonify.sonify`
db_lim (tuple or str): See docstring for :func:`~sonify.sonify`
freq_lim (tuple): Tuple defining frequency limits for spectrogram plot
log (bool): See docstring for :func:`~sonify.sonify`
is_local_time (bool): Passed to :class:`_UTCDateFormatter`
resolution (str): See docstring for :func:`~sonify.sonify`
Returns:
Tuple of (`fig`, `spec_line`, `wf_line`, `time_box`, `wf_progress`)
"""
if is_infrasound:
ylab = 'Pressure (Pa)'
clab = f'Power (dB rel. [{REFERENCE_PRESSURE * 1e6:g} µPa]$^2$ Hz$^{{-1}}$)'
ref_val = REFERENCE_PRESSURE
else:
ylab = 'Velocity (µm s$^{-1}$)'
if REFERENCE_VELOCITY == 1:
clab = (
f'Power (dB rel. {REFERENCE_VELOCITY:g} [m s$^{{-1}}$]$^2$ Hz$^{{-1}}$)'
)
else:
clab = (
f'Power (dB rel. [{REFERENCE_VELOCITY:g} m s$^{{-1}}$]$^2$ Hz$^{{-1}}$)'
)
ref_val = REFERENCE_VELOCITY
fs = tr.stats.sampling_rate
nperseg = int(spec_win_dur * fs) # Samples
nfft = np.power(2, int(np.ceil(np.log2(nperseg))) + 1) # Pad fft with zeroes
f, t, sxx = signal.spectrogram(
tr.data, fs, window='hann', nperseg=nperseg, noverlap=nperseg // 2, nfft=nfft
)
# [dB rel. (ref_val <ref_val_unit>)^2 Hz^-1]
sxx_db = 10 * np.log10(sxx / (ref_val**2))
t_mpl = tr.stats.starttime.matplotlib_date + (t / mdates.SEC_PER_DAY)
# Ensure a 16:9 aspect ratio
fig = Figure(figsize=(FIGURE_WIDTH, (9 / 16) * FIGURE_WIDTH))
# width_ratios effectively controls the colorbar width
gs = GridSpec(2, 2, figure=fig, height_ratios=[2, 1], width_ratios=[40, 1])
spec_ax = fig.add_subplot(gs[0, 0])
wf_ax = fig.add_subplot(gs[1, 0], sharex=spec_ax) # Share x-axis with spec
cax = fig.add_subplot(gs[0, 1])
wf_lw = 0.5
wf_ax.plot(tr.times('matplotlib'), tr.data * rescale, '#b0b0b0', linewidth=wf_lw)
wf_progress = wf_ax.plot(np.nan, np.nan, 'black', linewidth=wf_lw)[0]
wf_ax.set_ylabel(ylab)
wf_ax.grid(linestyle=':')
max_value = np.abs(tr.copy().trim(starttime, endtime).data).max() * rescale
wf_ax.set_ylim(-max_value, max_value)
im = spec_ax.pcolormesh(
t_mpl, f, sxx_db, cmap='inferno', shading='nearest', rasterized=True
)
spec_ax.set_ylabel('Frequency (Hz)')
spec_ax.grid(linestyle=':')
spec_ax.set_ylim(freq_lim)
if log:
spec_ax.set_yscale('log')
# Tick locating and formatting
locator = mdates.AutoDateLocator()
wf_ax.xaxis.set_major_locator(locator)
wf_ax.xaxis.set_major_formatter(_UTCDateFormatter(locator, is_local_time))
fig.autofmt_xdate()
# "Crop" x-axis!
wf_ax.set_xlim(starttime.matplotlib_date, endtime.matplotlib_date)
# Initialize animated stuff
line_kwargs = dict(x=starttime.matplotlib_date, color='forestgreen', linewidth=1)
spec_line = spec_ax.axvline(**line_kwargs)
wf_line = wf_ax.axvline(ymin=0.01, clip_on=False, zorder=10, **line_kwargs)
time_box = AnchoredText(
s=starttime.strftime('%H:%M:%S'),
pad=0.2,
loc='lower right',
bbox_to_anchor=[1, 1],
bbox_transform=wf_ax.transAxes,
borderpad=0,
prop=dict(color='forestgreen'),
)
offset_px = -0.0025 * RESOLUTIONS[resolution][1] # Resolution-independent!
time_box.txt._text.set_y(offset_px) # [pixels] Vertically center text
time_box.zorder = 12 # This should place it on the very top; see below
time_box.patch.set_linewidth(matplotlib.rcParams['axes.linewidth'])
wf_ax.add_artist(time_box)
# Adjustments to ensure time marker line is zordered properly
# 9 is below marker; 11 is above marker
spec_ax.spines['bottom'].set_zorder(9)
wf_ax.spines['top'].set_zorder(9)
for side in 'bottom', 'left', 'right':
wf_ax.spines[side].set_zorder(11)
# Pick smart limits rounded to nearest 10
if db_lim == 'smart':
db_min = np.percentile(sxx_db, 20)
db_max = sxx_db.max()
db_lim = (np.ceil(db_min / 10) * 10, np.floor(db_max / 10) * 10)
# Clip image to db_lim if provided (doesn't clip if db_lim=None)
im.set_clim(db_lim)
# Automatically determine whether to show triangle extensions on colorbar
# (kind of adopted from xarray)
if db_lim:
min_extend = sxx_db.min() < db_lim[0]
max_extend = sxx_db.max() > db_lim[1]
else:
min_extend = False
max_extend = False
if min_extend and max_extend:
extend = 'both'
elif min_extend:
extend = 'min'
elif max_extend:
extend = 'max'
else:
extend = 'neither'
fig.colorbar(im, cax, extend=extend, extendfrac=EXTENDFRAC, label=clab)
spec_ax.set_title(tr.id, family='JetBrains Mono')
fig.tight_layout()
fig.subplots_adjust(hspace=0, wspace=0.05)
# Finnicky formatting to get extension triangles (if they exist) to extend
# above and below the vertical extent of the spectrogram axes
pos = cax.get_position()
triangle_height = EXTENDFRAC * pos.height
ymin = pos.ymin
height = pos.height
if min_extend and max_extend:
ymin -= triangle_height
height += 2 * triangle_height
elif min_extend and not max_extend:
ymin -= triangle_height
height += triangle_height
elif max_extend and not min_extend:
height += triangle_height
else:
pass
cax.set_position([pos.xmin, ymin, pos.width, height])
# Move offset text around and format it more nicely, see
# https://github.com/matplotlib/matplotlib/blob/710fce3df95e22701bd68bf6af2c8adbc9d67a79/lib/matplotlib/ticker.py#L677
magnitude = wf_ax.yaxis.get_major_formatter().orderOfMagnitude
if magnitude: # I.e., if offset text is present
wf_ax.yaxis.get_offset_text().set_visible(False) # Remove original text
sf = ScalarFormatter(useMathText=True)
sf.orderOfMagnitude = magnitude # Formatter needs to know this!
sf.locs = [47] # Can't be an empty list
wf_ax.text(
0.002,
0.95,
sf.get_offset(), # Let the ScalarFormatter do the formatting work
transform=wf_ax.transAxes,
ha='left',
va='top',
)
return fig, spec_line, wf_line, time_box, wf_progress
def _ffmpeg_combine(audio_file, video_file, output_file, call_str):
"""
Combine audio and video files into a single movie. Uses a system call to
`FFmpeg`_.
Args:
audio_file (:class:`~pathlib.Path`): Audio file to use
video_file (:class:`~pathlib.Path`): Video file to use
output_file (:class:`~pathlib.Path`): Output file (full path)
call_str (str): Formatted record of sonify call to add to metadata
.. _FFmpeg: https://www.ffmpeg.org/
"""
args = [
'ffmpeg',
'-y',
'-v',
'warning',
'-i',
video_file,
'-guess_layout_max',
'0',
'-i',
audio_file,
'-c:v',
'copy',
'-c:a',
'aac',
'-b:a',
'320k',
'-ac',
'2',
'-metadata',
f'artist=sonify, rev. {__version__}',
'-metadata',
f'comment={call_str}',
output_file,
]
print('Combining video and audio using FFmpeg...')
code = subprocess.call(args)
if code == 0:
print(f'Video saved as {output_file}')
else:
output_file.unlink(missing_ok=True) # Remove file if it was made
raise OSError(
'Issue with FFmpeg conversion. Check error messages and try again.'
)
# Subclass ConciseDateFormatter (modifies __init__() and set_axis() methods)
class _UTCDateFormatter(mdates.ConciseDateFormatter):
def __init__(self, locator, is_local_time):
super().__init__(locator)
# Determine proper time label (local time or UTC)
if is_local_time:
time_type = 'Local'
else:
time_type = 'UTC'
# Re-format datetimes
self.formats[1] = '%B'
self.zero_formats[2:4] = ['%B', '%B %d']
self.offset_formats = [
f'{time_type} time',
f'{time_type} time in %Y',
f'{time_type} time in %B %Y',
f'{time_type} time on %B %d, %Y',
f'{time_type} time on %B %d, %Y',
f'{time_type} time on %B %d, %Y at %H:%M',
]
def set_axis(self, axis):
self.axis = axis
# If this is an x-axis (usually is!) then center the offset text
if self.axis.axis_name == 'x':
offset = self.axis.get_offset_text()
offset.set_horizontalalignment('center')
offset.set_x(0.5)
def main():
"""
This function is run when ``sonify.py`` is called as a script. It's also set
up as an entry point.
"""
parser = argparse.ArgumentParser(
description='Produce an animated spectrogram with a soundtrack derived from sped-up seismic or infrasound data.',
allow_abbrev=False,
)
# Hack the printing function of the parser to fix --db_lim option formatting
def _print_message_replace(self, message, file=None):
if message:
if file is None:
file = _sys.stderr
file.write(message.replace('[DB_LIM ...]', '[DB_LIM]'))
parser._print_message = MethodType(_print_message_replace, parser)
parser.add_argument(
'-v',
'--version',
action='version',
version=f'{parser.prog}, rev. {__version__}',
help=f'show revision number and exit',
)
parser.add_argument('network', help='SEED network code')
parser.add_argument('station', help='SEED station code')
parser.add_argument('channel', help='SEED channel code')
parser.add_argument(
'starttime',
type=UTCDateTime,
help='start time of animation (UTC), format yyyy-mm-ddThh:mm:ss',
)
parser.add_argument(
'endtime',
type=UTCDateTime,
help='end time of animation (UTC), format yyyy-mm-ddThh:mm:ss',
)
parser.add_argument('--location', default='*', help='SEED location code')
parser.add_argument(
'--freqmin',
default=None,
type=float,
help='lower bandpass corner [Hz] (defaults to 20 Hz / "SPEED_UP_FACTOR")',
)
parser.add_argument(
'--freqmax',
default=None,
type=float,
help='upper bandpass corner [Hz] (defaults to 20,000 Hz / "SPEED_UP_FACTOR" or the Nyquist frequency, whichever is smaller)',
)
parser.add_argument(
'--speed_up_factor',
default=200,
type=int,
help='factor by which to speed up the waveform data (higher values = higher pitches)',
)
parser.add_argument(
'--fps', default=1, type=int, help='frames per second of output video'
)
parser.add_argument(
'--resolution',
default='4K',
choices=RESOLUTIONS.keys(),
help='resolution of output video; one of "crude" (640 x 360), "720p" (1280 x 720), "1080p" (1920 x 1080), "2K" (2560 x 1440), or "4K" (3840 x 2160)',
)
parser.add_argument(
'--output_dir',
default=None,
help='directory where output video should be saved (defaults to current working directory)',
)
parser.add_argument(
'--spec_win_dur',
default=5,
type=float,
help='duration of spectrogram window [s]',
)
parser.add_argument(
'--db_lim',
default='smart',
nargs='+',
help='numbers "<min>" "<max>" defining min and max colormap cutoffs [dB], "smart" for a sensible automatic choice, or "None" for no clipping',
)
parser.add_argument(
'--log',
action='store_true',
help='use log scaling for y-axis of spectrogram',
)
parser.add_argument(
'--utc_offset',
default=None,
type=float,
help='if provided, convert UTC time to local time using this offset [hours] before plotting',
)
input_args = parser.parse_args()
# Extra type check for db_lim kwarg
db_lim_error = False
db_lim = np.atleast_1d(input_args.db_lim)
if db_lim.size == 1:
db_lim = db_lim[0]
if db_lim == 'smart':
pass
elif db_lim == 'None':
db_lim = None
else:
db_lim_error = True
elif db_lim.size == 2:
try:
db_lim = tuple(float(s) for s in db_lim)
except ValueError:
db_lim_error = True
else: # User provided more than 2 args
db_lim_error = True
if db_lim_error:
parser.error(
'argument --db_lim: must be one of "smart", "None", or two numeric values "<min>" "<max>"'
)
sonify(
input_args.network,
input_args.station,
input_args.channel,
input_args.starttime,
input_args.endtime,
input_args.location,
input_args.freqmin,
input_args.freqmax,
input_args.speed_up_factor,
input_args.fps,
input_args.resolution,
input_args.output_dir,
input_args.spec_win_dur,
db_lim,
input_args.log,
input_args.utc_offset,
)
if __name__ == '__main__':
main()