"""Objects for storing multimodal data.
"""
import json
import sys
from abc import ABC, abstractmethod
from datetime import timedelta
from functools import reduce
from math import isnan
from typing import Any, Dict, List, Optional, TextIO, Tuple, Union
import numpy as np
import polars as pl
import srt
import yaml
from intervaltree import Interval, IntervalTree
from pydantic import (
BaseModel,
ConfigDict,
Field,
FilePath,
InstanceOf,
NonNegativeFloat,
NonNegativeInt,
PositiveFloat,
PositiveInt,
computed_field,
confloat,
create_model,
field_validator,
model_validator,
)
from pydantic.functional_validators import BeforeValidator
from typing_extensions import Annotated
"""Value that is returned if a feature is not present.
"""
[docs]
ProbFloat = Annotated[Optional[NonNegativeFloat], Field(le=1.0)]
"""Probability float type.
Restricts the range to [0, 1].
"""
def _float_to_str(x: Union[Optional[float], Optional[str]]) -> Optional[str]:
if isinstance(x, str):
return x
if isinstance(x, (float, int)):
return str(int(x))
return None
[docs]
FloatToStr = Annotated[
Optional[str],
BeforeValidator(_float_to_str),
]
"""Convert floats or integers to strings.
Type that converts a float or integer to a string.
Returns `None` for other types than :class:`float`, :class:`int`, :class:`str`.
"""
def _nan_to_none(x: Optional[float]) -> Optional[confloat(allow_inf_nan=False)]:
if x is not None and isnan(x):
return None
return x
[docs]
FloatOrNone = Annotated[
Optional[confloat(allow_inf_nan=False)], BeforeValidator(_nan_to_none)
]
"""Convert nan float types to None types.
Type that converts a float that is nan into a None type.
Returns also None for None types.
"""
def _check_sorted(x: List):
if x == sorted(x):
return x
raise ValueError("Attribute must be in ascending order")
def _check_common_length(obj: BaseModel) -> Any:
for v in obj.model_fields:
a = getattr(obj, v)
if isinstance(a, list) and len(a) > 0 and len(a) != len(obj.frame):
raise ValueError(
f"List attribute {v} must have the same length as 'frame'"
)
return obj
# Adapted from librosa package: https://github.com/librosa/librosa/blob/main/librosa/_typing.py
_Window = Union[str, Tuple[Any, ...], float]
[docs]
class BaseData(BaseModel, ABC):
"""Base class for storing segment data."""
class _BaseOutput(BaseModel, ABC):
@staticmethod
@abstractmethod
def serialization_name() -> str:
return ""
[docs]
class BaseFeatures(_BaseOutput):
"""Base class for storing features.
Attributes
----------
filename: pydantic.FilePath
Path to the video file. Must be a valid path.
"""
def __eq__(self, other: "BaseFeatures") -> bool:
if self.__class__.__name__ is not other.__class__.__name__:
return NotImplemented
for field in self.model_fields:
if field not in other.model_fields or getattr(
self, field
) != getattr(other, field):
return False
return True
@classmethod
[docs]
def from_json(
cls,
filename: str,
extra_filename: Optional[str] = None,
encoding: str = "utf-8",
):
"""Load data from a JSON file.
Parameters
----------
filename: str
Name of the JSON file from which the object should be loaded.
Must have a .json ending.
"""
with open(filename, "r", encoding=encoding) as file:
data = json.load(file)
if extra_filename is not None:
data["filename"] = extra_filename
return cls(**data)
[docs]
def write_json(self, filename: str, encoding: str = "utf-8"):
"""Store data in a JSON file.
Arguments
---------
filename: str
Name of the destination file. Must have a .json ending.
"""
with open(filename, "w", encoding=encoding) as file:
file.write(self.model_dump_json())
[docs]
class BaseAnnotation(_BaseOutput):
"""Base class for storing annotations.
Attributes
----------
filename: pydantic.FilePath
Name of annotated file. Must be a valid path.
segments: intervaltree.IntervalTree, optional, default=None
Interval tree containing :class:`intervaltree.Interval` annotation segments.
Annotation data is stored in the :attr:`data` attribute of each :class:`intervaltree.Interval`.
"""
[docs]
model_config = ConfigDict(arbitrary_types_allowed=True)
@staticmethod
@abstractmethod
def data_type() -> Any:
pass
@classmethod
[docs]
def from_json(
cls,
filename: str,
extra_filename: Optional[str] = None,
encoding: str = "utf-8",
):
"""Load data from a JSON file.
Parameters
----------
filename: str
Name of the JSON file from which the object should be loaded.
Must have a .json ending.
"""
with open(filename, "r", encoding=encoding) as file:
data = json.load(file)
data["segments"] = IntervalTree(
[
Interval(
begin=seg["begin"],
end=seg["end"],
data=cls.data_type().model_validate(seg["data"]),
)
for seg in data["segments"]
]
)
if extra_filename is not None:
data["filename"] = extra_filename
return cls(**data)
[docs]
def write_json(self, filename: str, encoding: str = "utf-8"):
"""Store data in a JSON file.
Arguments
---------
filename: str
Name of the destination file. Must have a .json ending.
"""
with open(filename, "w", encoding=encoding) as file:
data = self.model_dump(mode="json", exclude=["segments"])
data["segments"] = [
{"begin": iv.begin, "end": iv.end, "data": iv.data.model_dump()}
for iv in self.segments.all_intervals
]
json.dump(data, file)
[docs]
class VideoAnnotation(BaseFeatures):
"""Video annotation class for storing facial features.
Attributes
----------
frame : typing.List[pydantic.NonNegativeInt], default=list()
Index of each frame. Must be non-negative and in ascending order.
time : typing.List[pydantic.NonNegativeFloat], default=list()
Timestamp of each frame in seconds. Must be non-negative and in ascending order.
face_box : typing.List[typing.Optional[typing.List[pydantic.NonNegativeFloat]]], optional, default=list()
Bounding box of a detected face. Is `None` if no face was detected.
face_prob : typing.List[ProbFloat], optional, default=list()
Probability of a detected face. Is `None` if no face was detected.
face_landmarks : typing.List[typing.Optional[typing.List[typing.List[pydantic.NonNegativeFloat]]], optional, default=list()
Facial landmarks of a detected face. Is `None` if no face was detected.
face_aus : typing.List[typing.Optional[typing.List[ProbFloat]]], optional, default=list()
Facial action unit activations of a detected face. Is `None` if no face was detected.
face_label : typing.List[Float2Str], optional, default=list()
Label of a detected face. Is `None` if no face was detected.
face_embeddings : typing.List[typing.Optional[typing.List[float]]], optional, default=list()
Embedding vector (list of 512 float elements) for each detected face in the input video.
face_confidence : typing.List[ProbFloat], optional, default=list()
Confidence of the `face_label` assignment. Is `None` if no face was detected or
only one face label was assigned.
face_average_embeddings : typing.Dict[Float2Str, typing.List[float]], optional, default=dict()
Average embedding vector (list of 512 float elements) for each face in the input video.
"""
[docs]
frame: List[NonNegativeInt] = Field(default_factory=list)
[docs]
time: List[NonNegativeFloat] = Field(default_factory=list)
[docs]
face_box: Optional[List[Optional[List[NonNegativeFloat]]]] = Field(
default_factory=list
)
[docs]
face_prob: Optional[List[ProbFloat]] = Field(default_factory=list)
[docs]
face_landmarks: Optional[
List[Optional[List[List[NonNegativeFloat]]]]
] = Field(default_factory=list)
[docs]
face_aus: Optional[List[Optional[List[ProbFloat]]]] = Field(
default_factory=list
)
[docs]
face_label: Optional[List[FloatToStr]] = Field(default_factory=list)
[docs]
face_embeddings: Optional[List[Optional[List[float]]]] = Field(
default_factory=list
)
[docs]
face_confidence: Optional[List[ProbFloat]] = Field(default_factory=list)
[docs]
face_average_embeddings: Optional[Dict[FloatToStr, List[float]]] = Field(
default_factory=dict
)
[docs]
model_config = ConfigDict(validate_assignment=True)
_check_sorted_frame = field_validator("frame", mode="after")(_check_sorted)
_check_sorted_time = field_validator("time", mode="after")(_check_sorted)
@field_validator("face_box", mode="after")
def _check_len_face_box(cls, v):
if v is not None and any(len(e) != 4 for e in v if e is not None):
raise ValueError("All face boxes must have four coordinates")
return v
@field_validator("face_landmarks", mode="after")
def _check_len_face_landmarks(cls, v):
if v is not None and any(
len(b) != 2 for e in v if e is not None for b in e
):
raise ValueError(
"All face landmarks must have x and y coordinate pairs"
)
return v
@model_validator(mode="after")
def _check_finite(self) -> "VideoAnnotation":
for frm, box, prob, lmk, au in zip(
self.frame,
self.face_box,
self.face_prob,
self.face_landmarks,
self.face_aus,
):
if box is None and not (box == prob == lmk == au):
raise ValueError(
f"Face boxes, probabilities, landmarks, and action units not all valid or invalid for frame {frm}"
)
return self
_common_length = model_validator(mode="after")(_check_common_length)
@model_validator(mode="after")
def _check_face_labels(self) -> "VideoAnnotation":
if not self.face_average_embeddings or not self.face_label:
return self
unique_labels = set(self.face_label)
if all(
lbl in self.face_average_embeddings.keys()
for lbl in unique_labels
if lbl is not None
):
return self
raise ValueError(
f"Keys in 'face_average_embeddings' {self.face_average_embeddings.keys()} must be the same as unique values in 'face_label' {unique_labels}"
)
@staticmethod
def serialization_name() -> str:
return "video_annotation"
[docs]
class VoiceFeaturesConfig(BaseModel):
"""Configure the calculation of signal properties used for voice feature extraction.
Create a pseudo-immutable object with attributes that are recognized by the
:class:`VoiceExtractor` class and forwarded as arguments to signal property objects defined
in :mod:`mexca.audio.features`. Details can be found in the feature class documentation.
Attributes
----------
frame_len: pydantic.PositiveInt, default=1024
Number of samples per frame.
hop_len: pydantic.PositiveInt, default=256
Number of samples between frame starting points.
center: bool, default=True
Whether the signal has been centered and padded before framing.
pad_mode: str, default='constant'
How the signal has been padded before framing. See :func:`numpy.pad`.
Uses the default value 0 for `'constant'` padding.
spec_window: _Window, default="hann"
The window that is applied before the STFT to obtain spectra.
pitch_lower_freq: pydantic.NonNegativeFloat, default=75.0
Lower limit used for pitch estimation (in Hz).
pitch_upper_freq: pydantic.NonNegativeFloat, default=600.0
Upper limit used for pitch estimation (in Hz).
pitch_method: str, default="pyin"
Method used for estimating voice pitch.
ptich_n_harmonics: pydantic.PositiveInt, default=100
Number of estimated pitch harmonics.
pitch_pulse_lower_period: pydantic.PositiveFloat, default=0.0001
Lower limit for periods between glottal pulses for jitter and shimmer extraction.
pitch_pulse_upper_period: pydantic.PositiveFloat, default=0.02
Upper limit for periods between glottal pulses for jitter and shimmer extraction.
pitch_pulse_max_period_ratio: pydantic.PositiveFloat, default=1.3
Maximum ratio between consecutive glottal periods for jitter and shimmer extraction.
pitch_pulse_max_amp_factor: pydantic.PositiveFloat, default=1.6
Maximum ratio between consecutive amplitudes used for shimmer extraction.
jitter_rel: bool, default=True
Divide jitter by the average pitch period.
shimmer_rel: bool, default=True
Divide shimmer by the average pulse amplitude.
hnr_lower_freq: pydantic.PositiveFloat, default = 75.0
Lower fundamental frequency limit for choosing pitch candidates when computing the harmonics-to-noise ratio (HNR).
hnr_rel_silence_threshold: pydantic.PositiveFloat, default = 0.1
Relative threshold for treating signal frames as silent when computing the HNR.
formants_max: pydantic.PositiveInt, default=5
The maximum number of formants that are extracted.
formants_lower_freq: pydantic.NonNegativeFloat, default=50.0
Lower limit for formant frequencies (in Hz).
formants_upper_freq: pydantic.NonNegativeFloat, default=5450.0
Upper limit for formant frequencies (in Hz).
formants_signal_preemphasis_from: pydantic.NonNegativeFloat, optional, default=50.0
Starting value for the applied preemphasis function (in Hz).
formants_window: _Window, default="praat_gaussian"
Window function that is applied before formant estimation.
formants_amp_lower: pydantic.PositiveFloat, optional, default=0.8
Lower boundary for formant peak amplitude search interval.
formants_amp_upper: pydantic.PositiveFloat, optional, default=1.2
Upper boundary for formant peak amplitude search interval.
formants_amp_rel_f0: bool, optional, default=True
Whether the formant amplitude is divided by the fundamental frequency amplitude.
alpha_ratio_lower_band: tuple, default=(50.0, 1000.0)
Boundaries of the alpha ratio lower frequency band (start, end) in Hz.
alpha_ratio_upper_band: tuple, default=(1000.0, 5000.0)
Boundaries of the alpha ratio upper frequency band (start, end) in Hz.
hammar_index_pivot_point_freq: pydantic.PositiveFloat, default=2000.0
Point separating the Hammarberg index lower and upper frequency regions in Hz.
hammar_index_upper_freq: pydantic.PositiveFloat, default=5000.0
Upper limit for the Hammarberg index upper frequency region in Hz.
spectral_slopes_bands: tuple, default=((0.0, 500.0), (500.0, 1500.0))
Frequency bands in Hz for which spectral slopes are estimated.
mel_spec_n_mels: pydantic.PositiveInt, default=26
Number of Mel filters.
mel_spec_lower_freq: pydantic.NonNegativeFloat, default=20.0
Lower frequency boundary for Mel spectogram transformation in Hz.
mel_spec_upper_freq: pydantic.NonNegativeFloat, default=8000.0
Upper frequency boundary for Mel spectogram transformation in Hz.
mfcc_n: pydantic.PositiveInt, default=4
Number of Mel frequency cepstral coefficients (MFCCs) that are estimated per frame.
mfcc_lifter: pydantic.NonNegativeFloat, default=22.0
Cepstral liftering coefficient for MFCC estimation. Must be >= 0. If zero, no liftering is applied.
"""
[docs]
frame_len: PositiveInt = 1024
[docs]
hop_len: PositiveInt = 256
[docs]
pad_mode: str = "constant"
[docs]
spec_window: _Window = "hann"
[docs]
pitch_lower_freq: NonNegativeFloat = 75.0
[docs]
pitch_upper_freq: NonNegativeFloat = 600.0
[docs]
pitch_method: str = "pyin"
pitch_n_harmonics: PositiveInt = 100
[docs]
pitch_pulse_lower_period: PositiveFloat = 0.0001
[docs]
pitch_pulse_upper_period: PositiveFloat = 0.02
[docs]
pitch_pulse_max_period_ratio: PositiveFloat = 1.3
[docs]
pitch_pulse_max_amp_factor: PositiveFloat = 1.6
[docs]
jitter_rel: bool = True
[docs]
shimmer_rel: bool = True
[docs]
hnr_lower_freq: PositiveFloat = 75.0
[docs]
hnr_rel_silence_threshold: PositiveFloat = 0.1
[docs]
alpha_ratio_lower_band: Tuple[NonNegativeFloat, NonNegativeFloat] = (
50.0,
1000.0,
)
[docs]
alpha_ratio_upper_band: Tuple[NonNegativeFloat, NonNegativeFloat] = (
1000.0,
5000.0,
)
[docs]
hammar_index_pivot_point_freq: PositiveFloat = 2000.0
[docs]
hammar_index_upper_freq: PositiveFloat = 5000.0
[docs]
spectral_slopes_bands: Tuple[
Tuple[NonNegativeFloat, NonNegativeFloat],
Tuple[NonNegativeFloat, NonNegativeFloat],
] = ((0.0, 500.0), (500.0, 1500.0))
[docs]
mel_spec_n_mels: PositiveInt = 26
[docs]
mel_spec_lower_freq: NonNegativeFloat = 20.0
[docs]
mel_spec_upper_freq: NonNegativeFloat = 8000.0
[docs]
mfcc_n: PositiveInt = 4
[docs]
mfcc_lifter: NonNegativeFloat = 22.0
@classmethod
[docs]
def from_yaml(cls, filename: str):
"""Load a voice configuration object from a YAML file.
Uses safe YAML loading (only supports native YAML but no Python tags).
Converts loaded YAML sequences to tuples.
Parameters
----------
filename: str
Path to the YAML file. Must have a .yml or .yaml ending.
"""
with open(filename, "r", encoding="utf-8") as file:
config_dict = yaml.safe_load(file)
return cls(**config_dict)
[docs]
def write_yaml(self, filename: str):
"""Write a voice configuration object to a YAML file.
Uses safe YAML dumping (only supports native YAML but no Python tags).
Parameters
----------
filename: str
Path to the YAML file. Must have a .yml or .yaml ending.
"""
with open(filename, "w", encoding="utf-8") as file:
yaml.safe_dump(self.model_dump(), file)
[docs]
class VoiceFeatures(BaseFeatures):
"""Class for storing voice features.
Features are stored as lists (like columns of a data frame).
Optional features are initialized as empty lists.
Attributes
----------
frame: typing.List[pydantic.NonNegativeInt]
The frame index for which features were extracted. Must be non-negative and in ascending order.
time: typing.List[pydantic.NonNegativeFloat]
The time stamp at which features were extracted. Must be non-negative and in ascending order.
"""
[docs]
frame: List[NonNegativeInt]
[docs]
time: List[NonNegativeFloat]
[docs]
model_config = ConfigDict(validate_assignment=True)
_check_sorted_frame = field_validator("frame", mode="after")(_check_sorted)
_check_sorted_time = field_validator("time", mode="after")(_check_sorted)
_common_length = model_validator(mode="after")(_check_common_length)
@staticmethod
def serialization_name() -> str:
return "voice_features"
def add_feature(self, name: str, feature: List[float]):
self.__class__ = create_model(
"VoiceFeatures",
**{name: (List[FloatOrNone], Field(default_factory=list))},
__base__=(self.__class__,),
)
setattr(self, name, feature)
@classmethod
[docs]
def from_json(
cls,
filename: str,
extra_filename: Optional[str] = None,
encoding: str = "utf-8",
):
"""Load data from a JSON file.
Parameters
----------
filename: str
Name of the JSON file from which the object should be loaded.
Must have a .json ending.
"""
with open(filename, "r", encoding=encoding) as file:
data = json.load(file)
if extra_filename is not None:
data["filename"] = extra_filename
# Initialize class object
obj = cls(
filename=data.pop("filename"),
frame=data.pop("frame"),
time=data.pop("time"),
)
# Add custom features and validate them
for key, val in data.items():
obj.add_feature(key, val)
return obj
def _get_rttm_header() -> List[str]:
return [
"type",
"file",
"chnl",
"tbeg",
"tdur",
"ortho",
"stype",
"name",
"conf",
]
[docs]
class SegmentData(BaseData):
"""Class for storing speech segment data.
Attributes
----------
name : str
Speaker label.
conf : ProbFloat, optional, default=None
Confidence of speaker label.
"""
[docs]
conf: Optional[ProbFloat] = None
[docs]
class SpeakerAnnotation(BaseAnnotation):
"""Class for storing speaker and speech segment annotations.
Attributes
----------
filename : pydantic.FilePath
Name of the annotated audio file. Must be a valid path.
channel : int, optional
Channel index.
speaker_average_embeddings : typing.Dict[FloatToStr, List[float]], optional
Average embedding vector for each speaker label.
segments : intervaltree.IntervalTree, optional
Stores speech segments as :class:`intervaltree.Interval`.
Speaker labels are stored in :class:`SegmentData` objects in the :class:`data` attribute of each interval.
"""
[docs]
channel: Optional[int] = None
[docs]
speaker_average_embeddings: Optional[Dict[FloatToStr, List[float]]] = Field(
default_factory=dict
)
@staticmethod
def serialization_name() -> str:
return "audio_annotation"
@staticmethod
def data_type() -> Any:
return SegmentData
def __str__(
self, end: str = "\t", file: TextIO = sys.stdout, header: bool = True
):
if header:
for h in _get_rttm_header():
print(h, end=end, file=file)
print("", file=file)
for seg in self.segments.items():
for col in (
"SPEAKER",
self.filename,
self.channel,
seg.begin,
seg.end - seg.begin,
None,
None,
seg.data.name,
seg.data.conf,
):
if col is not None:
if isinstance(col, float):
col = round(col, 2)
print(col, end=end, file=file)
else:
print("<NA>", end=end, file=file)
print("", file=file)
return ""
@classmethod
[docs]
def from_pyannote(
cls,
annotation: "pyannote.core.Annotation",
embeddings: Optional[Dict[str, List[float]]] = None,
):
"""Create a :class:`SpeakerAnnotation` object from a :class:`pyannote.core.Annotation` object.
Parameters
----------
annotation : pyannote.core.Annotation
Annotation object containing speech segments and speaker labels.
"""
segments = []
for seg, _, spk in annotation.itertracks(yield_label=True):
segments.append(
Interval(
begin=seg.start,
end=seg.end,
data=SegmentData(name=str(spk)),
)
)
return cls(
filename=annotation.uri,
channel=1,
segments=IntervalTree(segments),
speaker_average_embeddings=embeddings,
)
@classmethod
[docs]
def from_rttm(cls, filename: str, extra_filename: Optional[str] = None):
"""Load a speaker annotation from an RTTM file.
Parameters
----------
filename : str
Path to the file. Must have an RTTM ending.
"""
with open(filename, "r", encoding="utf-8") as file:
segments = []
for row in file:
row_split = [
None if cell == "<NA>" else cell for cell in row.split(" ")
]
segment = Interval(
begin=float(row_split[3]),
end=float(row_split[3]) + float(row_split[4]),
data=SegmentData(
name=row_split[7],
),
)
segments.append(segment)
return cls(
filename=row_split[1]
if extra_filename is None
else extra_filename,
channel=int(row_split[2]),
segments=IntervalTree(segments),
)
# pylint: disable=unnecessary-dunder-call
[docs]
def write_rttm(self, filename: str):
"""Write a speaker annotation to an RTTM file.
Parameters
----------
filename : str
Path to the file. Must have an RTTM ending.
"""
with open(filename, "w", encoding="utf-8") as file:
self.__str__(end=" ", file=file, header=False)
[docs]
class TranscriptionData(BaseData):
"""Class for storing transcription data.
Attributes
----------
index: int
Index of the transcribed sentence.
text: str
Transcribed text.
speaker: str, optional, default=None
Speaker of the transcribed text.
confidence : ProbFloat, optional, default=None
Average word probability of transcribed text.
"""
[docs]
speaker: Optional[str] = None
[docs]
confidence: Optional[ProbFloat] = None
[docs]
class AudioTranscription(BaseAnnotation):
"""Class for storing audio transcriptions.
Attributes
----------
filename : pydantic.FilePath
Name of the transcribed audio file. Must be a valid path.
segments: intervaltree.IntervalTree, optional, default=None
Interval tree containing the transcribed speech segments split into sentences as intervals.
The transcribed sentences are stored in the `data` attribute of each interval.
"""
@property
[docs]
def subtitles(self):
"""Deprecated alias for `segments`."""
return self.segments
@staticmethod
def serialization_name() -> str:
return "transcription"
@staticmethod
def data_type() -> Any:
return TranscriptionData
@classmethod
[docs]
def from_srt(cls, filename: str, extra_filename: Optional[str] = None):
"""Load an audio transcription from an SRT file.
Parameters
----------
filename: str
Name of the file to be loaded. Must have an .srt ending.
"""
with open(filename, "r", encoding="utf-8") as file:
segments = srt.parse(file)
intervals = []
for sub in segments:
content = sub.content.split(">")
intervals.append(
Interval(
begin=sub.start.total_seconds(),
end=sub.end.total_seconds(),
data=TranscriptionData(
index=sub.index,
text=content[1],
speaker=content[0][1:],
),
)
)
return cls(
filename=filename if extra_filename is None else extra_filename,
segments=IntervalTree(intervals),
)
[docs]
def write_srt(self, filename: str):
"""Write an audio transcription to an SRT file
Parameters
----------
filename: str
Name of the file to write to. Must have an .srt ending.
"""
segments = []
for iv in self.segments.all_intervals:
content = f"<{iv.data.speaker}> {iv.data.text}"
segments.append(
srt.Subtitle(
index=iv.data.index,
start=timedelta(seconds=iv.begin),
end=timedelta(seconds=iv.end),
content=content,
)
)
with open(filename, "w", encoding="utf-8") as file:
file.write(srt.compose(segments))
[docs]
class SentimentData(BaseData):
"""Class for storing sentiment data.
Attributes
----------
text: str
Text of the sentence for which sentiment scores were predicted.
pos: ProbFloat
Positive sentiment score.
neg: ProbFloat
Negative sentiment score.
neu: ProbFloat
Neutral sentiment score.
"""
[docs]
class SentimentAnnotation(BaseAnnotation):
"""Class for storing sentiment scores of transcribed sentences.
Stores sentiment scores as intervals in an interval tree. The scores are stored in the `data` attribute of each interval.
Attributes
----------
filename : pydantic.FilePath
Name of the file from which sentiment was extracted. Must be a valid path.
"""
@staticmethod
def serialization_name() -> str:
return "sentiment"
@staticmethod
def data_type() -> Any:
return SentimentData
[docs]
class Multimodal(BaseModel):
"""Class for storing multimodal features.
See the :ref:`Output` section for details.
Attributes
----------
filename : pydantic.FilePath
Name of the video file. Must be a valid path.
duration : pydantic.NonNegativeFloat, optional, default=None
Video duration in seconds.
fps : pydantic.PositiveFloat
Frames per second.
fps_adjusted : pydantic.PositiveFloat
Frames per seconds adjusted for skipped frames.
Mostly needed for internal computations.
video_annotation : VideoAnnotation
Object containing facial features.
audio_annotation : SpeakerAnnotation
Object containing speech segments and speakers.
voice_features : VoiceFeatures
Object containing voice features.
transcription : AudioTranscription
Object containing transcribed speech segments split into sentences.
sentiment : SentimentAnnotation
Object containing sentiment scores for transcribed sentences.
features : polars.LazyFrame
Merged features stored in a :class:`polars.LazyFrame` object that uses lazy evaluation. To trigger evaluation
the :func:`collect` method can be called.
"""
[docs]
duration: Optional[NonNegativeFloat] = None
[docs]
fps: Optional[PositiveFloat] = None
_fps_adjusted: Optional[PositiveFloat] = None
[docs]
video_annotation: Optional[VideoAnnotation] = None
[docs]
audio_annotation: Optional[SpeakerAnnotation] = None
[docs]
voice_features: Optional[VoiceFeatures] = None
[docs]
transcription: Optional[AudioTranscription] = None
[docs]
sentiment: Optional[SentimentAnnotation] = None
[docs]
features: Optional[pl.LazyFrame] = None
[docs]
model_config = ConfigDict(
arbitrary_types_allowed=True, validate_assignment=True
)
@computed_field
[docs]
def fps_adjusted(self) -> PositiveFloat:
return self.fps if self._fps_adjusted is None else self._fps_adjusted
@fps_adjusted.setter
def fps_adjusted(self, value: PositiveFloat):
self._fps_adjusted = value
def _merge_video_annotation(self, data_frames: List[pl.DataFrame]):
# create a new VideoAnnotation instance and copy all fields to the new instance
# (except for average face embeddings) because the face embeddings have a different
# dimension to the other fields in the instance. if we include the face embeddings
# then the conversion of the data to a dataframe would fail.
if self.video_annotation:
video_annotation_dict = self.video_annotation.model_dump()
del video_annotation_dict["face_average_embeddings"]
del video_annotation_dict["face_embeddings"]
# Only include list attributes that have length > 0
data_frames.append(
pl.LazyFrame(
{
key: val
for (key, val) in video_annotation_dict.items()
if isinstance(val, list) and len(val) > 0
}
)
)
def _merge_audio_text_features(self, data_frames: List[pl.DataFrame]):
if self.audio_annotation and self.audio_annotation.segments:
audio_annotation_dict = {
"frame": [],
"segment_start": [],
"segment_end": [],
"segment_speaker_label": [],
}
time = np.arange(
0.0, self.duration, 1 / self.fps_adjusted, dtype=np.float32
)
frame = np.arange(
0,
self.duration * self.fps,
self.fps / self.fps_adjusted,
dtype=np.int32,
)
if self.transcription and self.transcription.segments:
text_features_dict = {
"frame": [],
"span_start": [],
"span_end": [],
"span_text": [],
"segment_speaker_label": [],
"span_confidence": [], # store confidence of transcription accuracy
}
if self.sentiment and self.sentiment.segments:
sentiment_dict = {
"frame": [],
"span_text": [],
"span_sent_pos": [],
"span_sent_neg": [],
"span_sent_neu": [],
}
for i, t in zip(frame, time):
overlap_segments = self.audio_annotation.segments[t]
if len(overlap_segments) > 0:
for seg in overlap_segments:
audio_annotation_dict["frame"].append(i)
audio_annotation_dict["segment_start"].append(seg.begin)
audio_annotation_dict["segment_end"].append(seg.end)
audio_annotation_dict["segment_speaker_label"].append(
str(seg.data.name)
)
else:
audio_annotation_dict["frame"].append(i)
audio_annotation_dict["segment_start"].append(None)
audio_annotation_dict["segment_end"].append(None)
audio_annotation_dict["segment_speaker_label"].append(None)
if self.transcription and self.transcription.segments:
for span in self.transcription.segments[t]:
text_features_dict["frame"].append(i)
text_features_dict["span_start"].append(span.begin)
text_features_dict["span_end"].append(span.end)
text_features_dict["span_text"].append(span.data.text)
text_features_dict["segment_speaker_label"].append(
span.data.speaker
)
text_features_dict["span_confidence"].append(
span.data.confidence
) # store confidence of transcription accuracy
if self.sentiment and self.sentiment.segments:
for sent in self.sentiment.segments[t]:
sentiment_dict["frame"].append(i)
sentiment_dict["span_text"].append(sent.data.text)
sentiment_dict["span_sent_pos"].append(
sent.data.pos
)
sentiment_dict["span_sent_neg"].append(
sent.data.neg
)
sentiment_dict["span_sent_neu"].append(
sent.data.neu
)
audio_text_features_df = pl.LazyFrame(audio_annotation_dict)
if self.transcription and self.transcription.segments:
text_features_df = pl.LazyFrame(text_features_dict)
if self.sentiment and self.sentiment.segments:
text_features_df = text_features_df.join(
pl.LazyFrame(sentiment_dict),
on=["frame", "span_text"],
how="left",
)
audio_text_features_df = audio_text_features_df.join(
text_features_df,
on=["frame", "segment_speaker_label"],
how="left",
)
data_frames.append(audio_text_features_df)
def _merge_voice_features(self, data_frames: List):
if self.voice_features:
data_frames.append(pl.LazyFrame(self.voice_features.model_dump()))
@staticmethod
def _delete_filename_time_col(df: pl.LazyFrame) -> pl.LazyFrame:
if "time" in df.columns:
df = df.drop("time")
if "filename" in df.columns:
df = df.drop("filename")
return df
[docs]
def merge_features(self) -> pl.LazyFrame:
"""Merge multimodal features from pipeline components into a common data frame.
Transforms and merges the available output stored in the `Multimodal` object
based on the `'frame'` variable. Stores the merged features as a `pandas.DataFrame`
in the `features` attribute.
Returns
-------
pandas.DataFrame
Merged multimodal features.
"""
dfs = []
self._merge_video_annotation(data_frames=dfs)
self._merge_audio_text_features(data_frames=dfs)
self._merge_voice_features(data_frames=dfs)
if len(dfs) > 0:
dfs = map(self._delete_filename_time_col, dfs)
self.features = reduce(
lambda left, right: left.join(right, on=["frame"], how="left"),
dfs,
)
self.features = self.features.select(
pl.lit(self.filename.as_posix()).alias("filename"),
pl.col("frame").mul(1.0 / self.fps).alias("time"),
pl.all(),
)
return self.features