Package basictdf

basictdf is a read and write parser for the BTS Bioengineering TDF file format. This format is tipically used as storage of raw data from a BTS motion capture acquisition system (e.g. raw EMG, 2D raw marker data) but can also serve as storage for processed data (e.g. 3D reconstructed marker data, filtered EMG signals, events).

How to read a TDF file?

from basictdf import Tdf

# You can use a context manager to automatically close the file
with Tdf("path/to/file.tdf") as tdf:
    data3D = tdf.data3D

# Or you can read individual blocks.
data3D = Tdf("path/to/file.tdf").data3D

How to modify a TDF file?

The propper way to modify a TDF file is through a context manager. This way, the file is guaranteed to be closed properly. Specifying the mode as "r+b" (read and write) is also required using the Tdf.allow_write() method.

You can't write to a tdf file like this.

# Don't do this. This will fail
Tdf("path/to/file.tdf").data3D = oldData3D

Use a context manager instead

# Do this instead
with Tdf("path/to/file.tdf").allow_write() as tdf:
    oldData3D = tdf.data3D

    # Let's add 1 to the X coordinate of the c7 marker
    oldData3D["c7"].X = oldData3D["c7"].X + 1

    tdf.data3D = oldData3D

# This is fine too
tdf = Tdf("path/to/file.tdf")

with tdf.allow_write() as tdf:
    tdf.data3D = oldData3D

How to add a new block to a TDF file?

How to create a TDF from scratch?

The easiest way to add a new block to a TDF is actually to modify an existing block with the same type. However, if you want to add a completely new block, you can use any of the Block subclasses, like Data3D or TemporalEventsData.

from basictdf import Tdf, Data3D, MarkerTrack
import numpy as np

# Create an empty data3D block
data3D = Data3D(
    frequency=1000,
    nFrames=1000,
    volume=np.array([1, 1, 1]),
    translationVector=np.array([0, 0, 0]),
    rotationMatrix=np.array([[1, 0, 0], [0, 1, 0], [0, 0, 1]]),
)

# Create a bogus marker track with random data
c7 = MarkerTrack("c7", np.random.rand(1000, 3))

# Add it to the data3D block
data3D.add_track(c7)

# Write it to a new TDF file
with Tdf.new("my_file.tdf").allow_write() as tdf:
    tdf.data3D = data3D

# This works too
tdf = Tdf.new("my_file.tdf")
with tdf.allow_write() as tdf:
    tdf.data3D = data3D
Expand source code
from __future__ import annotations

__doc__ = """
basictdf is a **read** and **write** parser for the BTS Bioengineering TDF file
format.
This format is tipically used as storage of raw data from a BTS motion capture
 acquisition system (e.g. raw EMG, 2D raw marker data) but can also serve as
storage for processed data
(e.g. 3D reconstructed marker data, filtered EMG signals, events).

## How to read a TDF file?

```python
from basictdf import Tdf

# You can use a context manager to automatically close the file
with Tdf("path/to/file.tdf") as tdf:
    data3D = tdf.data3D

# Or you can read individual blocks.
data3D = Tdf("path/to/file.tdf").data3D
```

## How to modify a TDF file?

The propper way to modify a TDF file is through a context manager.
This way, the file is guaranteed to be closed properly.
Specifying the mode as "r+b" (read and write) is also required
using the `basictdf.Tdf.allow_write` method.


You **can't** write to a tdf file like this.

```python
# Don't do this. This will fail
Tdf("path/to/file.tdf").data3D = oldData3D
```
Use a context manager instead

```python
# Do this instead
with Tdf("path/to/file.tdf").allow_write() as tdf:
    oldData3D = tdf.data3D

    # Let's add 1 to the X coordinate of the c7 marker
    oldData3D["c7"].X = oldData3D["c7"].X + 1

    tdf.data3D = oldData3D

# This is fine too
tdf = Tdf("path/to/file.tdf")

with tdf.allow_write() as tdf:
    tdf.data3D = oldData3D
```


## How to add a new block to a TDF file?
## How to create a TDF from scratch?

The easiest way to add a new block to a TDF is actually to modify an existing
block with the same type. However, if you want to add a completely new block,
 you can use any of the `basictdf.tdfBlock.Block` subclasses, like
 `basictdf.tdfData3D.Data3D` or `basictdf.tdfEvents.TemporalEventsData`.

```python
from basictdf import Tdf, Data3D, MarkerTrack
import numpy as np

# Create an empty data3D block
data3D = Data3D(
    frequency=1000,
    nFrames=1000,
    volume=np.array([1, 1, 1]),
    translationVector=np.array([0, 0, 0]),
    rotationMatrix=np.array([[1, 0, 0], [0, 1, 0], [0, 0, 1]]),
)

# Create a bogus marker track with random data
c7 = MarkerTrack("c7", np.random.rand(1000, 3))

# Add it to the data3D block
data3D.add_track(c7)

# Write it to a new TDF file
with Tdf.new("my_file.tdf").allow_write() as tdf:
    tdf.data3D = data3D

# This works too
tdf = Tdf.new("my_file.tdf")
with tdf.allow_write() as tdf:
    tdf.data3D = data3D
```
"""
from basictdf.basictdf import Tdf
from basictdf.tdfData3D import Data3D, MarkerTrack
from basictdf.tdfEMG import EMG, EMGTrack
from basictdf.tdfEvents import Event, EventsDataType, TemporalEventsData

__all__ = [
    "Tdf",
    "Data3D",
    "MarkerTrack",
    "TemporalEventsData",
    "Event",
    "EventsDataType",
    "EMG",
    "EMGTrack",
    "ForcePlatformsDataBlock",
    "ForcePlatformData",
]


__pdoc__ = {}
__pdoc__["basictdf.tdfUtils"] = False
__pdoc__["basictdf.tdfTypes"] = False
__pdoc__["collections.ABC"] = False

Sub-modules

basictdf.basictdf
basictdf.tdfBlock

Block and block type classes.

basictdf.tdfCalibrationData

The TDF Calibration Data block

basictdf.tdfData2D

Data2D module

basictdf.tdfData3D

Marker data module.

basictdf.tdfEMG

EMG data module …

basictdf.tdfEvents

Events data module.

basictdf.tdfForce3D

Force, torque and acceleration data module.

basictdf.tdfForcePlatformsCalibration

Force platform calibration data module.

basictdf.tdfForcePlatformsData

Force platform data module.

basictdf.tdfOpticalSystem

Optical System Configuration Data module.

Classes

class Data3D (frequency: float, nFrames: int, volume: numpy.ndarray, rotationMatrix: numpy.ndarray, translationVector: numpy.ndarray, startTime: float = 0.0, flag: Flags = Flags.rawData, format: Data3dBlockFormat = Data3dBlockFormat.byTrack)

A class to represent a TDF block.

A data block that contains marker tracks.

Args

frequency : float
data frequency in Hz
nFrames : int
number of frames in the data block
volume : Union[Volume,np.ndarray]
acquisition volume
rotationMatrix : np.ndarray
rotation matrix
translationVector : np.ndarray
translation vector
startTime : float, optional
Acquisition start time. Defaults to 0.0.
flag : Flags, optional
Data 3D block flags. Defaults to Flags.rawData.
format : Data3dBlockFormat, optional
Data3D format.

Defaults to Data3dBlockFormat.byTrack.

Raises

ValueError
the volume is not an array of 3 floats
ValueError
the rotation matrix is not an array of 3x3 floats
ValueError
the translation vector is not an array of 3 floats
Expand source code
class Data3D(Block):
    type = BlockType.data3D

    def __init__(
        self,
        frequency: float,
        nFrames: int,
        volume: np.ndarray,
        rotationMatrix: np.ndarray,
        translationVector: np.ndarray,
        startTime: float = 0.0,
        flag: Flags = Flags.rawData,
        format: Data3dBlockFormat = Data3dBlockFormat.byTrack,
    ) -> None:
        """A data block that contains marker tracks.

        Args:
            frequency (float): data frequency in Hz
            nFrames (int): number of frames in the data block
            volume (Union[Volume,np.ndarray]): acquisition volume
            rotationMatrix (np.ndarray): rotation matrix
            translationVector (np.ndarray): translation vector
            startTime (float, optional): Acquisition start time. Defaults to 0.0.
            flag (Flags, optional): Data 3D block flags. Defaults to Flags.rawData.
            format (Data3dBlockFormat, optional): Data3D format.
             Defaults to Data3dBlockFormat.byTrack.

        Raises:
            ValueError: the volume is not an array of 3 floats
            ValueError: the rotation matrix is not an array of 3x3 floats
            ValueError: the translation vector is not an array of 3 floats
        """
        super().__init__()
        self.format = format
        self.frequency = frequency
        self.startTime = startTime

        if not is_nparray_of_shape(rotationMatrix, MAT3X3F.btype.shape):
            raise ValueError(
                f"rotationMatrix must be a numpy array of shape {MAT3X3F.btype.shape}"
            )
        self.rotationMatrix = rotationMatrix

        if not is_nparray_of_shape(translationVector, VEC3F.btype.shape):
            raise ValueError(
                f"translationVector must be a numpy array of shape {VEC3F.btype}"
            )
        self.translationVector = translationVector

        if not is_nparray_of_shape(volume, Volume.btype.shape):
            raise ValueError(
                f"volume must be a numpy array of shape {Volume.btype.shape}"
            )

        self.volume = volume

        if not isinstance(flag, Flags):
            raise TypeError("flag must be of type Flags")

        self.flag = flag

        if not is_integer(nFrames):
            raise TypeError("nFrames must be an integer")

        self.nFrames = nFrames

        self._tracks = []

    def add_track(self, track: MarkerTrack) -> None:
        """Adds a track to the data block

        Args:
            track (MarkerTrack): track to add

        Raises:
            TypeError: Track is not of type MarkerTrack
            ValueError: Track has a different number of frames than the data block
        """
        if not isinstance(track, MarkerTrack):
            raise TypeError("Track must be of type Track")
        if track.nFrames != self.nFrames:
            raise ValueError(
                (
                    f"Track with label {track.label} has"
                    f" {track.nFrames} frames, expected {self.nFrames} frames"
                )
            )
        self._tracks.append(track)

    @property
    def tracks(self) -> List[MarkerTrack]:
        """Returns a list of all tracks in the data block

        Returns:
            List[MarkerTrack]: list of all tracks in the data block
        """
        return self._tracks

    @tracks.setter
    def tracks(self, values: Iterable[MarkerTrack]) -> None:
        """
        Sets the tracks in the data block.
        """
        oldTracks = self._tracks
        self._tracks = []
        try:
            for value in values:
                self.add_track(value)
        except Exception as e:
            self._tracks = oldTracks
            raise e

    @staticmethod
    def _build(stream, format) -> "Data3D":
        format = Data3dBlockFormat(format)
        nFrames = i32.bread(stream)
        frequency = i32.bread(stream)
        startTime = f32.bread(stream)
        nTracks = u32.bread(stream)
        volume = Volume.bread(stream)
        rotationMatrix = MAT3X3F.bread(stream)
        translationVector = VEC3F.bread(stream)
        flag = Flags(u32.bread(stream))

        d = Data3D(
            frequency,
            nFrames,
            volume,
            rotationMatrix,
            translationVector,
            startTime,
            flag,
            format,
        )

        if format in [Data3dBlockFormat.byTrack, Data3dBlockFormat.byFrame]:
            nLinks = i32.bread(stream)
            i32.skip(stream, 1)
            d.links = LinkType.bread(stream, nLinks)

        if format in [
            Data3dBlockFormat.byTrack,
            Data3dBlockFormat.byTrackWithoutLinks,
        ]:
            d._tracks = [MarkerTrack._build(stream, nFrames) for _ in range(nTracks)]
        else:
            raise NotImplementedError(f"Data3D format {format} not implemented yet")
        return d

    def __getitem__(self, key: Union[int, str]) -> MarkerTrack:
        if isinstance(key, int):
            return self._tracks[key]
        elif isinstance(key, str):
            try:
                return next(track for track in self._tracks if track.label == key)
            except StopIteration:
                raise KeyError(f"Track with label {key} not found")
        raise TypeError(f"Invalid key type {type(key)}")

    def __iter__(self) -> Iterator[MarkerTrack]:
        return iter(self._tracks)

    def __len__(self) -> int:
        return len(self._tracks)

    def __eq__(self, other) -> bool:
        buff1 = BytesIO()
        buff2 = BytesIO()
        self._write(buff1)
        other._write(buff2)
        return buff1.getvalue() == buff2.getvalue()

    def __contains__(self, value: Union[MarkerTrack, str]) -> bool:
        if isinstance(value, MarkerTrack):
            return value in self._tracks
        elif isinstance(value, str):
            return any(track.label == value for track in self._tracks)
        raise TypeError(f"Invalid value type {type(value)}")

    @property
    def nTracks(self) -> int:
        """Number of tracks in the data block

        Returns:
            int: number of tracks
        """
        return len(self._tracks)

    def _write(self, file: BinaryIO) -> None:
        if self.format not in [
            Data3dBlockFormat.byTrack,
            Data3dBlockFormat.byTrackWithoutLinks,
        ]:
            raise NotImplementedError(
                f"Data3D format {self.format} not implemented yet"
            )

        # nFrames
        i32.bwrite(file, self.nFrames)
        # frequency
        i32.bwrite(file, self.frequency)
        # startTime
        f32.bwrite(file, self.startTime)
        # nTracks
        u32.bwrite(file, len(self._tracks))

        # volume
        Volume.bwrite(file, self.volume)
        # rotationMatrix
        MAT3X3F.bwrite(file, self.rotationMatrix)
        # translationVector
        VEC3F.bwrite(file, self.translationVector)
        # flags
        u32.bwrite(file, self.flag.value)

        if self.format in [
            Data3dBlockFormat.byFrame,
            Data3dBlockFormat.byTrack,
        ]:
            links = self.links if hasattr(self, "links") else []
            nLinks = len(links)

            # nLinks
            i32.bwrite(file, nLinks)
            # padding
            i32.bpad(file)
            # links
            LinkType.bwrite(file, links)

        for track in self._tracks:
            track._write(file)

    @property
    def nBytes(self) -> int:
        base = (
            4  # nFrames
            + 4  # frequency
            + 4  # startTime
            + 4  # nTracks
            + Volume.btype.itemsize  # volume
            + MAT3X3F.btype.itemsize  # rotationMatrix
            + VEC3F.btype.itemsize  # translationVector
            + 4  # flags
        )

        if self.format in [
            Data3dBlockFormat.byFrame,
            Data3dBlockFormat.byTrack,
        ]:
            links_size = (
                4
                + 4
                + (
                    LinkType.btype.itemsize * len(self.links)
                    if hasattr(self, "links")
                    else 0
                )
            )
            base += links_size

        for track in self._tracks:
            base += track.nBytes

        return base

    def __repr__(self) -> str:
        return (
            f"<Data3D "
            f"format={self.format.name}, "
            f"nFrames={self.nFrames}, "
            f"frequency={self.frequency}, "
            f"startTime={self.startTime}, "
            f"nTracks={self.nTracks}, "
            f"tracks={[i.label for i in self._tracks]}>"
        )

Ancestors

  • Block
  • basictdf.tdfBlock.Sized
  • basictdf.tdfBlock.BuildWriteable
  • abc.ABC

Class variables

var type

Instance variables

var nBytes : int

Size in bytes

Expand source code
@property
def nBytes(self) -> int:
    base = (
        4  # nFrames
        + 4  # frequency
        + 4  # startTime
        + 4  # nTracks
        + Volume.btype.itemsize  # volume
        + MAT3X3F.btype.itemsize  # rotationMatrix
        + VEC3F.btype.itemsize  # translationVector
        + 4  # flags
    )

    if self.format in [
        Data3dBlockFormat.byFrame,
        Data3dBlockFormat.byTrack,
    ]:
        links_size = (
            4
            + 4
            + (
                LinkType.btype.itemsize * len(self.links)
                if hasattr(self, "links")
                else 0
            )
        )
        base += links_size

    for track in self._tracks:
        base += track.nBytes

    return base
var nTracks : int

Number of tracks in the data block

Returns

int
number of tracks
Expand source code
@property
def nTracks(self) -> int:
    """Number of tracks in the data block

    Returns:
        int: number of tracks
    """
    return len(self._tracks)
var tracks : List[MarkerTrack]

Returns a list of all tracks in the data block

Returns

List[MarkerTrack]
list of all tracks in the data block
Expand source code
@property
def tracks(self) -> List[MarkerTrack]:
    """Returns a list of all tracks in the data block

    Returns:
        List[MarkerTrack]: list of all tracks in the data block
    """
    return self._tracks

Methods

def add_track(self, track: MarkerTrack) ‑> None

Adds a track to the data block

Args

track : MarkerTrack
track to add

Raises

TypeError
Track is not of type MarkerTrack
ValueError
Track has a different number of frames than the data block
Expand source code
def add_track(self, track: MarkerTrack) -> None:
    """Adds a track to the data block

    Args:
        track (MarkerTrack): track to add

    Raises:
        TypeError: Track is not of type MarkerTrack
        ValueError: Track has a different number of frames than the data block
    """
    if not isinstance(track, MarkerTrack):
        raise TypeError("Track must be of type Track")
    if track.nFrames != self.nFrames:
        raise ValueError(
            (
                f"Track with label {track.label} has"
                f" {track.nFrames} frames, expected {self.nFrames} frames"
            )
        )
    self._tracks.append(track)
class EMG (frequency, nSamples, startTime=0.0, format=EMGBlockFormat.byTrack)

Electromyography data block

Expand source code
class EMG(Block):
    """Electromyography data block"""

    type = BlockType.electromyographicData

    def __init__(
        self, frequency, nSamples, startTime=0.0, format=EMGBlockFormat.byTrack
    ) -> None:
        super().__init__()
        self.frequency = frequency
        self.startTime = startTime
        self.nSamples = nSamples
        self._signals = []
        self._emgMap = []
        self.format = format

    @staticmethod
    def _build(stream, format) -> "EMG":
        format = EMGBlockFormat(format)
        nSignals = i32.bread(stream)
        frequency = i32.bread(stream)
        startTime = f32.bread(stream)
        nSamples = i32.bread(stream) + 49  # Why 49??? Whyyyy????
        emgMap = i16.bread(stream, n=nSignals)

        d = EMG(frequency, nSamples, startTime, format)
        if format == EMGBlockFormat.byTrack:
            for n in range(nSignals):
                emgSignal = EMGTrack._build(stream, nSamples)
                d.addSignal(emgSignal, channel=emgMap[n])
        else:
            raise NotImplementedError(f"EMG format {format} not implemented yet")
        return d

    def _write(self, file) -> None:
        if self.format != EMGBlockFormat.byTrack:
            raise NotImplementedError(f"EMG format {self.format} not implemented yet")

        # nSignals
        i32.bwrite(file, len(self._signals))

        # frequency
        i32.bwrite(file, self.frequency)

        # startTime
        f32.bwrite(file, self.startTime)

        # nSamples
        i32.bwrite(file, self.nSamples - 49)  # That 49 again

        # emgMap
        i16.bwrite(file, self._emgMap)

        # signals
        for signal in self._signals:
            signal._write(file)

    def __getitem__(self, key) -> EMGTrack:
        """
        Returns the EMG signal specified by its label or its channel

        Args:
            key (int or str): The channel or the label of the signal

        Raises:
            TypeError: If the key is not an int or a str
            KeyError: If the signal is not found
        """
        if isinstance(key, int):
            return self._signals[key]
        elif isinstance(key, str):
            try:
                return next(signal for signal in self._signals if signal.label == key)
            except StopIteration:
                raise KeyError(f"EMG signal with label {key} not found")
        raise TypeError(f"Invalid key type {type(key)}")

    def __contains__(self, value: Union[EMGTrack, str]) -> bool:
        if isinstance(value, str):
            return any(signal.label == value for signal in self._signals)
        elif isinstance(value, EMGTrack):
            return value in self._signals
        raise TypeError(f"Invalid value type {type(value)}")

    def __iter__(self) -> Iterator[EMGTrack]:
        return iter(self._signals)

    def __len__(self) -> int:
        return len(self._signals)

    def __eq__(self, other) -> bool:
        if not isinstance(other, EMG):
            return False
        return (
            self.frequency == other.frequency
            and self.startTime == other.startTime
            and self.nSamples == other.nSamples
            and all(s1 == s2 for s1, s2 in zip(self._signals, other._signals))
        )

    def addSignal(self, signal: EMGTrack, channel=None) -> None:
        """
        Adds a signal to the EMG block. If the channel is not specified,
        it is set to the next one  available

        Args:
            signal (EMGTrack): The signal to add
            channel (int, optional): The channel to use. Defaults to None.

        Raises:
            TypeError: If the signal is not an EMGTrack
            ValueError: If the signal has a different number of samples
            ValueError: If the channel is already in use
        """
        if not isinstance(signal, EMGTrack):
            raise TypeError(f"Can only add EMGTrack objects, got {type(signal)}")
        if signal.nSamples != self.nSamples:
            raise ValueError(
                (
                    f"EMGTrack with label {signal.label} has {signal.nSamples} "
                    f"samples, expected {self.nSamples}"
                )
            )

        if channel is None:
            if len(self._emgMap) == 0:
                next_channel = 0
            else:
                next_channel = max(self._emgMap) + 1
            self._emgMap.append(next_channel)
        else:
            if channel in self._emgMap:
                raise ValueError(f"Channel {channel} already in use")
            self._emgMap.append(channel)
        self._signals.append(signal)

    def removeSignal(self, label: str) -> None:
        """
        Removes a signal specified by its label from the EMG block

        Args:
            label (str): The label of the signal to remove

        Raises:
            KeyError: If the signal is not found
        """
        try:
            pos = next(i for i, v in enumerate(self._signals) if v == label)
        except StopIteration:
            raise KeyError(f"EMG signal with label {label} not found")

        del self._signals[pos]
        del self._emgMap[pos]

    @property
    def nBytes(self) -> int:
        base = 4 + 4 + 4 + 2 * len(self._signals) + 4
        for signal in self._signals:
            base += signal.nBytes
        return base

    @property
    def nSignals(self) -> int:
        return len(self._signals)

    def __repr__(self) -> str:
        return (
            "<EMGBlock"
            f" format={self.format.name}"
            f" frequency={self.frequency}"
            f" nSamples={self.nSamples}"
            f" nSignals={self.nSignals}"
            f" startTime={self.startTime}"
            ">"
        )

Ancestors

  • Block
  • basictdf.tdfBlock.Sized
  • basictdf.tdfBlock.BuildWriteable
  • abc.ABC

Class variables

var type

Instance variables

var nBytes : int

Size in bytes

Expand source code
@property
def nBytes(self) -> int:
    base = 4 + 4 + 4 + 2 * len(self._signals) + 4
    for signal in self._signals:
        base += signal.nBytes
    return base
var nSignals : int
Expand source code
@property
def nSignals(self) -> int:
    return len(self._signals)

Methods

def addSignal(self, signal: EMGTrack, channel=None) ‑> None

Adds a signal to the EMG block. If the channel is not specified, it is set to the next one available

Args

signal : EMGTrack
The signal to add
channel : int, optional
The channel to use. Defaults to None.

Raises

TypeError
If the signal is not an EMGTrack
ValueError
If the signal has a different number of samples
ValueError
If the channel is already in use
Expand source code
def addSignal(self, signal: EMGTrack, channel=None) -> None:
    """
    Adds a signal to the EMG block. If the channel is not specified,
    it is set to the next one  available

    Args:
        signal (EMGTrack): The signal to add
        channel (int, optional): The channel to use. Defaults to None.

    Raises:
        TypeError: If the signal is not an EMGTrack
        ValueError: If the signal has a different number of samples
        ValueError: If the channel is already in use
    """
    if not isinstance(signal, EMGTrack):
        raise TypeError(f"Can only add EMGTrack objects, got {type(signal)}")
    if signal.nSamples != self.nSamples:
        raise ValueError(
            (
                f"EMGTrack with label {signal.label} has {signal.nSamples} "
                f"samples, expected {self.nSamples}"
            )
        )

    if channel is None:
        if len(self._emgMap) == 0:
            next_channel = 0
        else:
            next_channel = max(self._emgMap) + 1
        self._emgMap.append(next_channel)
    else:
        if channel in self._emgMap:
            raise ValueError(f"Channel {channel} already in use")
        self._emgMap.append(channel)
    self._signals.append(signal)
def removeSignal(self, label: str) ‑> None

Removes a signal specified by its label from the EMG block

Args

label : str
The label of the signal to remove

Raises

KeyError
If the signal is not found
Expand source code
def removeSignal(self, label: str) -> None:
    """
    Removes a signal specified by its label from the EMG block

    Args:
        label (str): The label of the signal to remove

    Raises:
        KeyError: If the signal is not found
    """
    try:
        pos = next(i for i, v in enumerate(self._signals) if v == label)
    except StopIteration:
        raise KeyError(f"EMG signal with label {label} not found")

    del self._signals[pos]
    del self._emgMap[pos]
class EMGTrack (label: str, trackData: numpy.ndarray)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class EMGTrack(Sized, BuildWriteable):
    def __init__(self, label: str, trackData: np.ndarray) -> None:
        self.label = label
        self.data = trackData

    @property
    def nSamples(self) -> int:
        """
        Returns:
            int: number of samples of the track
        """
        return self.data.shape[0]

    @property
    def _segments(self):
        maskedTrackData = np.ma.masked_invalid(self.data)
        return np.ma.clump_unmasked(maskedTrackData.T)

    @staticmethod
    def _build(stream, nSamples) -> "EMGTrack":
        label = BTSString.bread(stream, 256)
        nSegments = i32.bread(stream)
        i32.skip(stream)  # padding
        segmentData = SegmentData.bread(stream, nSegments)
        trackData = np.empty(nSamples, dtype="<f4")
        trackData[:] = np.nan
        for startFrame, nFrames in segmentData:
            trackData[startFrame : startFrame + nFrames] = f32.bread(stream, nFrames)
        return EMGTrack(label, trackData)

    def _write(self, file) -> None:
        # label
        BTSString.bwrite(file, 256, self.label)

        segments = self._segments

        # nSegments
        i32.bwrite(file, len(segments))

        # padding
        i32.bpad(file, 1)

        for segment in segments:
            # startFrame
            i32.bwrite(file, segment.start)
            # nFrames
            i32.bwrite(file, segment.stop - segment.start)

        for segment in segments:
            # data
            f32.bwrite(file, self.data[segment])

    @property
    def nBytes(self):
        base = 256 + 4 + 4
        for segment in self._segments:
            base += 4 + 4 + (segment.stop - segment.start) * f32.btype.itemsize
        return base

    def __eq__(self, other):
        return self.label == other.label and np.all(self.data == other.data)

    def __repr__(self) -> str:
        return (
            f"EMGTrack(label={self.label}, nSamples={self.nSamples},"
            f"segments={len(self._segments)})"
        )

Ancestors

  • basictdf.tdfBlock.Sized
  • basictdf.tdfBlock.BuildWriteable
  • abc.ABC

Instance variables

var nBytes

Size in bytes

Expand source code
@property
def nBytes(self):
    base = 256 + 4 + 4
    for segment in self._segments:
        base += 4 + 4 + (segment.stop - segment.start) * f32.btype.itemsize
    return base
var nSamples : int

Returns

int
number of samples of the track
Expand source code
@property
def nSamples(self) -> int:
    """
    Returns:
        int: number of samples of the track
    """
    return self.data.shape[0]
class ForcePlatformsDataBlock (label: str, trackData: numpy.ndarray)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class EMGTrack(Sized, BuildWriteable):
    def __init__(self, label: str, trackData: np.ndarray) -> None:
        self.label = label
        self.data = trackData

    @property
    def nSamples(self) -> int:
        """
        Returns:
            int: number of samples of the track
        """
        return self.data.shape[0]

    @property
    def _segments(self):
        maskedTrackData = np.ma.masked_invalid(self.data)
        return np.ma.clump_unmasked(maskedTrackData.T)

    @staticmethod
    def _build(stream, nSamples) -> "EMGTrack":
        label = BTSString.bread(stream, 256)
        nSegments = i32.bread(stream)
        i32.skip(stream)  # padding
        segmentData = SegmentData.bread(stream, nSegments)
        trackData = np.empty(nSamples, dtype="<f4")
        trackData[:] = np.nan
        for startFrame, nFrames in segmentData:
            trackData[startFrame : startFrame + nFrames] = f32.bread(stream, nFrames)
        return EMGTrack(label, trackData)

    def _write(self, file) -> None:
        # label
        BTSString.bwrite(file, 256, self.label)

        segments = self._segments

        # nSegments
        i32.bwrite(file, len(segments))

        # padding
        i32.bpad(file, 1)

        for segment in segments:
            # startFrame
            i32.bwrite(file, segment.start)
            # nFrames
            i32.bwrite(file, segment.stop - segment.start)

        for segment in segments:
            # data
            f32.bwrite(file, self.data[segment])

    @property
    def nBytes(self):
        base = 256 + 4 + 4
        for segment in self._segments:
            base += 4 + 4 + (segment.stop - segment.start) * f32.btype.itemsize
        return base

    def __eq__(self, other):
        return self.label == other.label and np.all(self.data == other.data)

    def __repr__(self) -> str:
        return (
            f"EMGTrack(label={self.label}, nSamples={self.nSamples},"
            f"segments={len(self._segments)})"
        )

Ancestors

  • basictdf.tdfBlock.Sized
  • basictdf.tdfBlock.BuildWriteable
  • abc.ABC

Instance variables

var nBytes

Size in bytes

Expand source code
@property
def nBytes(self):
    base = 256 + 4 + 4
    for segment in self._segments:
        base += 4 + 4 + (segment.stop - segment.start) * f32.btype.itemsize
    return base
var nSamples : int

Returns

int
number of samples of the track
Expand source code
@property
def nSamples(self) -> int:
    """
    Returns:
        int: number of samples of the track
    """
    return self.data.shape[0]
class ForcePlatformData (label: str, trackData: numpy.ndarray)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class EMGTrack(Sized, BuildWriteable):
    def __init__(self, label: str, trackData: np.ndarray) -> None:
        self.label = label
        self.data = trackData

    @property
    def nSamples(self) -> int:
        """
        Returns:
            int: number of samples of the track
        """
        return self.data.shape[0]

    @property
    def _segments(self):
        maskedTrackData = np.ma.masked_invalid(self.data)
        return np.ma.clump_unmasked(maskedTrackData.T)

    @staticmethod
    def _build(stream, nSamples) -> "EMGTrack":
        label = BTSString.bread(stream, 256)
        nSegments = i32.bread(stream)
        i32.skip(stream)  # padding
        segmentData = SegmentData.bread(stream, nSegments)
        trackData = np.empty(nSamples, dtype="<f4")
        trackData[:] = np.nan
        for startFrame, nFrames in segmentData:
            trackData[startFrame : startFrame + nFrames] = f32.bread(stream, nFrames)
        return EMGTrack(label, trackData)

    def _write(self, file) -> None:
        # label
        BTSString.bwrite(file, 256, self.label)

        segments = self._segments

        # nSegments
        i32.bwrite(file, len(segments))

        # padding
        i32.bpad(file, 1)

        for segment in segments:
            # startFrame
            i32.bwrite(file, segment.start)
            # nFrames
            i32.bwrite(file, segment.stop - segment.start)

        for segment in segments:
            # data
            f32.bwrite(file, self.data[segment])

    @property
    def nBytes(self):
        base = 256 + 4 + 4
        for segment in self._segments:
            base += 4 + 4 + (segment.stop - segment.start) * f32.btype.itemsize
        return base

    def __eq__(self, other):
        return self.label == other.label and np.all(self.data == other.data)

    def __repr__(self) -> str:
        return (
            f"EMGTrack(label={self.label}, nSamples={self.nSamples},"
            f"segments={len(self._segments)})"
        )

Ancestors

  • basictdf.tdfBlock.Sized
  • basictdf.tdfBlock.BuildWriteable
  • abc.ABC

Instance variables

var nBytes

Size in bytes

Expand source code
@property
def nBytes(self):
    base = 256 + 4 + 4
    for segment in self._segments:
        base += 4 + 4 + (segment.stop - segment.start) * f32.btype.itemsize
    return base
var nSamples : int

Returns

int
number of samples of the track
Expand source code
@property
def nSamples(self) -> int:
    """
    Returns:
        int: number of samples of the track
    """
    return self.data.shape[0]
class Event (label, values=[], type=EventsDataType.singleEvent)

A class representing a single event or a sequence of events.

Expand source code
class Event(Sized, BuildWriteable):
    """
    A class representing a single event or a sequence of events.
    """

    def __init__(self, label, values=[], type=EventsDataType.singleEvent) -> None:
        self.label = label
        self.type = type
        if not is_iterable(values):
            raise TypeError("Values must be iterable")
        if isinstance(values, np.ndarray) and values.dtype == np.dtype("<f4"):
            self.values = values
        else:
            self.values = np.array(values, dtype="<f4")

        if len(values) > 1 and type == EventsDataType.singleEvent:
            raise TypeError("Can't have more than one value for a single event")

    def _write(self, stream) -> None:
        BTSString.bwrite(stream, 256, self.label)
        u32.bwrite(stream, self.type.value)  # type
        u32.bwrite(stream, len(self.values))  # nItems
        f32.bwrite(stream, self.values)

    @staticmethod
    def _build(stream) -> "Event":
        label = BTSString.bread(stream, 256)
        type_ = EventsDataType(u32.bread(stream))
        nItems = i32.bread(stream)
        values = np.array([f32.bread(stream) for _ in range(nItems)])
        return Event(label, values, type_)

    def __len__(self) -> int:
        return len(self.values)

    def __eq__(self, o: object) -> bool:
        if not isinstance(o, Event):
            return False
        return (
            self.label == o.label
            and self.type == o.type
            and np.array_equal(self.values, o.values)
        )

    @property
    def nBytes(self) -> int:
        return 256 + 4 + 4 + len(self.values) * 4

    def __repr__(self) -> str:
        return (
            "<Event "
            f"label={self.label} "
            f"type={self.type} "
            f"nItems={len(self.values)} "
            f"values={self.values}>"
        )

Ancestors

  • basictdf.tdfBlock.Sized
  • basictdf.tdfBlock.BuildWriteable
  • abc.ABC

Instance variables

var nBytes : int

Size in bytes

Expand source code
@property
def nBytes(self) -> int:
    return 256 + 4 + 4 + len(self.values) * 4
class EventsDataType (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code
class EventsDataType(Enum):
    singleEvent = 0
    eventSequence = 1

Ancestors

  • enum.Enum

Class variables

var eventSequence
var singleEvent
class MarkerTrack (label: str, track_data: numpy.ndarray)

A track that collects all the data of a physical marker, such as name and position.

Expand source code
class MarkerTrack(Sized, BuildWriteable):
    """
    A track that collects all the data of a physical marker, such as name and position.
    """

    def __init__(self, label: str, track_data: np.ndarray) -> None:
        self.label = label
        "The name of the marker"
        self.data = track_data
        "The actual marker data"

    @property
    def X(self) -> np.ndarray:
        """
        Convenience property that returns or sets the
        X component of the marker position.
        """
        return self.data[:, 0]

    @property
    def Y(self) -> np.ndarray:
        """
        Convenience property that returns or sets the
        Y component of the marker position.
        """
        return self.data[:, 1]

    @property
    def Z(self) -> np.ndarray:
        """
        Convenience property that returns or sets the
        Z component of the marker position.
        """
        return self.data[:, 2]

    @X.setter
    def X(self, value) -> None:
        self.data[:, 0] = value

    @Y.setter
    def Y(self, value) -> None:
        self.data[:, 1] = value

    @Z.setter
    def Z(self, value) -> None:
        self.data[:, 2] = value

    @property
    def nFrames(self) -> int:
        """
        Returns:
            int: number of frames in the track
        """
        return self.data.shape[0]

    @property
    def _segments(self):
        maskedTrackData = np.ma.masked_invalid(self.data)
        return np.ma.clump_unmasked(maskedTrackData.T[0])

    @staticmethod
    def _build(stream, nFrames: int) -> "MarkerTrack":
        trackData = np.empty(nFrames, dtype=TrackType.btype)
        trackData[:] = np.NaN

        label = BTSString.bread(stream, 256)
        nSegments = i32.bread(stream)
        i32.skip(stream)
        segmentData = SegmentData.bread(stream, nSegments)
        for startFrame, nFrames in segmentData:
            dat = TrackType.bread(stream, nFrames)
            trackData[startFrame : startFrame + nFrames] = dat
        return MarkerTrack(label, trackData)

    def _write(self, file) -> None:
        # label
        BTSString.bwrite(file, 256, self.label)

        segments = self._segments

        # nSegments
        i32.bwrite(file, len(segments))

        # padding
        i32.bpad(file, 1)

        for segment in segments:
            # startFrame
            i32.bwrite(file, np.array(segment.start))
            # nFrames
            i32.bwrite(file, np.array(segment.stop - segment.start))

        for segment in segments:
            # trackData
            TrackType.bwrite(file, self.data[segment])

    @property
    def nBytes(self) -> int:
        """
        Returns:
            int: size of the track in bytes
        """
        base = 256 + 4 + 4
        for segment in self._segments:
            base += 4 + 4 + (segment.stop - segment.start) * TrackType.btype.itemsize
        return base

    def __repr__(self) -> str:
        return f"Track(label={self.label}, nFrames={self.nFrames})"

    def __eq__(self, other):
        return self.label == other.label and np.all(self.data == other.data)

Ancestors

  • basictdf.tdfBlock.Sized
  • basictdf.tdfBlock.BuildWriteable
  • abc.ABC

Instance variables

var X : numpy.ndarray

Convenience property that returns or sets the X component of the marker position.

Expand source code
@property
def X(self) -> np.ndarray:
    """
    Convenience property that returns or sets the
    X component of the marker position.
    """
    return self.data[:, 0]
var Y : numpy.ndarray

Convenience property that returns or sets the Y component of the marker position.

Expand source code
@property
def Y(self) -> np.ndarray:
    """
    Convenience property that returns or sets the
    Y component of the marker position.
    """
    return self.data[:, 1]
var Z : numpy.ndarray

Convenience property that returns or sets the Z component of the marker position.

Expand source code
@property
def Z(self) -> np.ndarray:
    """
    Convenience property that returns or sets the
    Z component of the marker position.
    """
    return self.data[:, 2]
var data

The actual marker data

var label

The name of the marker

var nBytes : int

Returns

int
size of the track in bytes
Expand source code
@property
def nBytes(self) -> int:
    """
    Returns:
        int: size of the track in bytes
    """
    base = 256 + 4 + 4
    for segment in self._segments:
        base += 4 + 4 + (segment.stop - segment.start) * TrackType.btype.itemsize
    return base
var nFrames : int

Returns

int
number of frames in the track
Expand source code
@property
def nFrames(self) -> int:
    """
    Returns:
        int: number of frames in the track
    """
    return self.data.shape[0]
class Tdf (filename: Union[pathlib.Path, str])
Expand source code
class Tdf:
    SIGNATURE = b"\x82K`A\xd3\x11\x84\xca`\x00\xb6\xac\x16h\x0c\x08"

    def __init__(self, filename: Union[Path, str]) -> None:
        self.file_path = Path(filename)

        if not self.file_path.exists():
            raise FileNotFoundError(f"File {self.file_path} not found")

        self._mode = "rb"
        self._inside_context = False

    def allow_write(self) -> "Tdf":
        """Allow writing to the file."""
        self._mode = "r+b"
        return self

    def __enter__(self) -> "Tdf":
        self._inside_context = True
        self.handler: IO[bytes] = self.file_path.open(self._mode)

        self.signature = self.handler.read(len(self.SIGNATURE))

        if self.signature != self.SIGNATURE:
            raise Exception("Invalid TDF file")

        self.version = u32.bread(self.handler)
        self.nEntries = i32.bread(self.handler)

        # pad 8 bytes
        i32.skip(self.handler, 2)

        self.creation_date = BTSDate.bread(self.handler)
        self.last_modification_date = BTSDate.bread(self.handler)
        self.last_access_date = BTSDate.bread(self.handler)

        # pad 20 bytes
        i32.skip(self.handler, 5)

        self.entries = [TdfEntry._build(self.handler) for _ in range(self.nEntries)]
        "List of entries in the file"

        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        self._inside_context = False
        self._mode = "rb"
        self.handler.close()

    @property
    @provide_context_if_needed
    def blocks(self) -> List[Block]:
        """Get all blocks in the file."""
        return [self.get_block(entry.type) for entry in self.entries]

    @provide_context_if_needed
    def get_block(self, index_or_type: Union[BlockType, int]) -> Optional[Type[Block]]:
        """Get a block from the TDF file."""

        if isinstance(index_or_type, int):
            if 0 <= index_or_type < len(self.entries):
                entry = self.entries[index_or_type]
            else:
                raise IndexError(f"Index {index_or_type} out of range")

        elif isinstance(index_or_type, BlockType):
            entry = next((e for e in self.entries if e.type == index_or_type), None)
            if entry is None:
                raise Exception(f"Block {index_or_type} not found")

        else:
            raise TypeError(f"Expected int or BlockType, got {type(index_or_type)}")

        self.handler.seek(entry.offset, 0)
        block_class = _get_block_class(entry.type)
        return block_class._build(self.handler, entry.format)

    def __getitem__(
        self, index_or_type: Union[BlockType, int]
    ) -> Optional[Type[Block]]:
        return self.get_block(index_or_type)

    @property
    @provide_context_if_needed
    def data3D(self) -> Optional[Data3D]:
        """
        Convenience property to get/set the 3D data block.
        """
        return self.get_block(BlockType.data3D)

    @data3D.setter
    @raise_if_outside_context
    def data3D(self, data: Data3D) -> None:
        self.replace_block(data) if self.has_data3D else self.add_block(data)

    @property
    @provide_context_if_needed
    def has_data3D(self) -> bool:
        """Check if the file has a 3D data block."""
        return any(entry.type == BlockType.data3D for entry in self.entries)

    @property
    @provide_context_if_needed
    def force_and_torque(self) -> Optional[ForceTorque3D]:
        """Convenience property to get/set the force and torque data block."""
        return self.get_block(BlockType.forceAndTorqueData)

    @force_and_torque.setter
    @raise_if_outside_write_context
    def force_and_torque(self, data: ForceTorque3D) -> None:
        self.replace_block(data) if self.has_force_and_torque else self.add_block(data)

    @property
    @provide_context_if_needed
    def has_force_and_torque(self) -> bool:
        """Check if the file has a force and torque data block."""
        return any(entry.type == BlockType.forceAndTorqueData for entry in self.entries)

    @property
    @provide_context_if_needed
    def events(self) -> Optional[TemporalEventsData]:
        """Convenience property to get/set the events data block."""
        return self.get_block(BlockType.temporalEventsData)

    @events.setter
    @raise_if_outside_write_context
    def events(self, data: TemporalEventsData) -> None:
        self.replace_block(data) if self.has_events else self.add_block(data)

    @property
    @provide_context_if_needed
    def has_events(self) -> bool:
        """Check if the TDF file has an events block"""
        return any(i for i in self.entries if i.type == BlockType.temporalEventsData)

    @property
    @provide_context_if_needed
    def emg(self) -> Optional[EMG]:
        """Convenience property to get/set the EMG data block."""
        return self.get_block(EMG.type)

    @emg.setter
    @raise_if_outside_write_context
    def emg(self, data: EMG) -> None:
        self.replace_block(data) if self.has_emg else self.add_block(data)

    @property
    @provide_context_if_needed
    def has_emg(self) -> bool:
        """Check if the TDF file has an EMG block"""
        return any(
            entry.type == BlockType.electromyographicData for entry in self.entries
        )

    @property
    @provide_context_if_needed
    def calibrationData(self) -> Optional[CalibrationDataBlock]:
        """Convenience property to get/set the calibration data block."""
        return self.get_block(CalibrationDataBlock.type)

    @raise_if_outside_write_context
    def add_block(
        self, newBlock: Block, comment: str = "Generated by basicTDF"
    ) -> None:
        """Adds a block to the TDF file

        Args:
            newBlock (Block): the block to be added.
            comment (str, optional): A description for the block entry.
            Defaults to "Generated by basicTDF".

        Raises:
            PermissionError: the TDF is read only
            ValueError: there's already a block of the same type
            ValueError: block limit reached (14 as per BTS's implementation)
            IOError: unused empty blocks in the middle of the file
        """
        if self._mode == "rb":
            raise PermissionError(
                "Can't add blocks, this file was opened in read-only mode"
            )

        try:
            if self.get_block(newBlock.type):
                raise ValueError(
                    (
                        f"There's already a block of this type {newBlock.type}"
                        " .Remove it first"
                    )
                )
        except Exception:
            pass

        # find first unused slot
        try:
            unusedBlockPos = next(
                n for n, i in enumerate(self.entries) if i.type == BlockType.unusedSlot
            )
        except StopIteration:
            raise ValueError(f"Block limit reached ({len(self.entries)})")

        # write new entry with the offset of that unused slot
        new_entry = TdfEntry(
            type=newBlock.type,
            format=newBlock.format.value,
            offset=self.entries[unusedBlockPos].offset,
            size=newBlock.nBytes,
            creation_date=newBlock.creation_date,
            last_modification_date=newBlock.last_modification_date,
            last_access_date=datetime.now(),
            comment=comment,
        )

        # replace the entry
        self.entries[unusedBlockPos] = new_entry

        # write new entry
        self.handler.seek(64 + 288 * unusedBlockPos, 0)
        new_entry._write(self.handler)

        # update all unused slots's offset
        for n, entry in enumerate(
            self.entries[unusedBlockPos + 1 :], start=unusedBlockPos + 1
        ):
            if entry.type == BlockType.unusedSlot:
                entry.offset = new_entry.offset + new_entry.size
                self.handler.seek(64 + 288 * n, 0)
                entry._write(self.handler)
            else:
                raise IOError("All unused slots must be at the end of the file")

        # write new block
        self.handler.seek(new_entry.offset, 0)
        newBlock._write(self.handler)

        # ensure the file is the correct size
        # and that the changes are written to disk
        self.handler.flush()

    @raise_if_outside_write_context
    def remove_block(self, type: Block) -> None:
        """Remove a block of the given type from the file. Removing a block
        implies:

        - Removing the entry
        - Updating all subsequent unused slots's offset
          (subtracting the size of the removed block)
        - Inserting a new unused slot entry at the end
          (with the previous slot offset + size as offset)
        - If there is info after the block, move it
          block_to_remove.size up

        """
        if "+" not in self._mode:
            raise PermissionError(
                "Can't remove blocks, this file was opened in read-only mode"
            )

        # find block
        try:
            oldEntryPos, oldEntry = next(
                (n, i) for n, i in enumerate(self.entries) if i.type == type
            )
        except StopIteration:
            raise ValueError(f"No block of type {type} found")

        # calculate new offset for the next unused slot
        newOffset = (
            self.entries[-1].offset if oldEntryPos != 0 else (64 + 288 * self.nEntries)
        )

        # delete entry
        self.entries.remove(oldEntry)
        self.handler.seek(64 + 288 * oldEntryPos, 0)
        # update all the offsets of the entries preceding the removed one
        for entry in self.entries[oldEntryPos:]:
            entry.offset -= oldEntry.size
            entry._write(self.handler)

        # add new unused slot at the end
        date = datetime.now()
        newEntry = TdfEntry(
            type=BlockType.unusedSlot,
            format=0,
            offset=newOffset,
            size=0,
            creation_date=date,
            last_modification_date=date,
            last_access_date=date,
            comment="Generated by basicTDF",
        )
        self.entries.append(newEntry)
        newEntry._write(self.handler)

        self.handler.seek(oldEntry.offset + oldEntry.size, 0)
        temp = self.handler.read()
        self.handler.seek(oldEntry.offset, 0)
        self.handler.write(temp)
        self.handler.truncate()
        self.handler.flush()

    @staticmethod
    def new(filename: str) -> "Tdf":
        """Creates a new TDF file"""
        filePath = Path(filename)
        if filePath.exists():
            raise FileExistsError("File already exists")

        nEntries = 14
        date = datetime.now()
        with filePath.open("wb") as f:
            # signature
            f.write(Tdf.SIGNATURE)
            # version
            i32.bwrite(f, 1)
            # nEntries
            i32.bwrite(f, nEntries)
            # reserved
            i32.bpad(f, 2)
            # creation date
            BTSDate.bwrite(f, date)
            # last modification date
            BTSDate.bwrite(f, date)
            # last access date
            BTSDate.bwrite(f, date)
            # reserved
            i32.bpad(f, 5)

            # start entries
            entryOffset = 64

            # all entries start with offset to the where entries stop
            blockOffset = entryOffset + nEntries * 288

            for _ in range(nEntries):
                # type
                u32.bwrite(f, 0)
                # format
                u32.bwrite(f, 0)
                # offset
                i32.bwrite(f, blockOffset)
                # size
                i32.bwrite(f, 0)
                # creation date
                BTSDate.bwrite(f, date)
                # last modification date
                BTSDate.bwrite(f, date)
                # last access date
                BTSDate.bwrite(f, date)
                # reserved
                i32.bpad(f, 1)
                # comment
                BTSString.bwrite(f, 256, "Generated by basicTDF")

        return Tdf(filePath)

    @raise_if_outside_write_context
    def replace_block(self, newBlock: Block, comment: Optional[str] = None) -> None:
        """Replace a block of the same type with a new one. This is done by
        removing the old block and adding the new one."""

        old_entry = next((i for i in self.entries if i.type == newBlock.type), None)

        if old_entry is None:
            raise ValueError(f"No block of type {newBlock.type} found")

        comment = comment if comment is not None else old_entry.comment

        self.remove_block(newBlock.type)
        self.add_block(newBlock, comment)

    @property
    def nBytes(self) -> int:
        """Return the size of the TDF file in bytes"""
        return self.file_path.stat().st_size

    def __len__(self) -> int:
        """Return the number of blocks in the TDF file
        that are not of type unusedSlot
        """
        return sum(1 for i in self.entries if i.type != BlockType.unusedSlot)

    def __eq__(self, o: object) -> bool:
        if not isinstance(o, Tdf):
            return False
        return (
            self.version == o.version
            and self.nEntries == o.nEntries
            and self.blocks == o.blocks
        )

    @provide_context_if_needed
    def __repr__(self) -> str:
        return f"Tdf({self.file_path}, nEntries={self.nEntries}, nBytes={self.nBytes}"

Class variables

var SIGNATURE

Static methods

def new(filename: str) ‑> Tdf

Creates a new TDF file

Expand source code
@staticmethod
def new(filename: str) -> "Tdf":
    """Creates a new TDF file"""
    filePath = Path(filename)
    if filePath.exists():
        raise FileExistsError("File already exists")

    nEntries = 14
    date = datetime.now()
    with filePath.open("wb") as f:
        # signature
        f.write(Tdf.SIGNATURE)
        # version
        i32.bwrite(f, 1)
        # nEntries
        i32.bwrite(f, nEntries)
        # reserved
        i32.bpad(f, 2)
        # creation date
        BTSDate.bwrite(f, date)
        # last modification date
        BTSDate.bwrite(f, date)
        # last access date
        BTSDate.bwrite(f, date)
        # reserved
        i32.bpad(f, 5)

        # start entries
        entryOffset = 64

        # all entries start with offset to the where entries stop
        blockOffset = entryOffset + nEntries * 288

        for _ in range(nEntries):
            # type
            u32.bwrite(f, 0)
            # format
            u32.bwrite(f, 0)
            # offset
            i32.bwrite(f, blockOffset)
            # size
            i32.bwrite(f, 0)
            # creation date
            BTSDate.bwrite(f, date)
            # last modification date
            BTSDate.bwrite(f, date)
            # last access date
            BTSDate.bwrite(f, date)
            # reserved
            i32.bpad(f, 1)
            # comment
            BTSString.bwrite(f, 256, "Generated by basicTDF")

    return Tdf(filePath)

Instance variables

var blocks : List[Block]

Get all blocks in the file.

Expand source code
@property
@provide_context_if_needed
def blocks(self) -> List[Block]:
    """Get all blocks in the file."""
    return [self.get_block(entry.type) for entry in self.entries]
var calibrationData : Optional[CalibrationDataBlock]

Convenience property to get/set the calibration data block.

Expand source code
@property
@provide_context_if_needed
def calibrationData(self) -> Optional[CalibrationDataBlock]:
    """Convenience property to get/set the calibration data block."""
    return self.get_block(CalibrationDataBlock.type)
var data3D : Optional[Data3D]

Convenience property to get/set the 3D data block.

Expand source code
@property
@provide_context_if_needed
def data3D(self) -> Optional[Data3D]:
    """
    Convenience property to get/set the 3D data block.
    """
    return self.get_block(BlockType.data3D)
var emg : Optional[EMG]

Convenience property to get/set the EMG data block.

Expand source code
@property
@provide_context_if_needed
def emg(self) -> Optional[EMG]:
    """Convenience property to get/set the EMG data block."""
    return self.get_block(EMG.type)
var events : Optional[TemporalEventsData]

Convenience property to get/set the events data block.

Expand source code
@property
@provide_context_if_needed
def events(self) -> Optional[TemporalEventsData]:
    """Convenience property to get/set the events data block."""
    return self.get_block(BlockType.temporalEventsData)
var force_and_torque : Optional[ForceTorque3D]

Convenience property to get/set the force and torque data block.

Expand source code
@property
@provide_context_if_needed
def force_and_torque(self) -> Optional[ForceTorque3D]:
    """Convenience property to get/set the force and torque data block."""
    return self.get_block(BlockType.forceAndTorqueData)
var has_data3D : bool

Check if the file has a 3D data block.

Expand source code
@property
@provide_context_if_needed
def has_data3D(self) -> bool:
    """Check if the file has a 3D data block."""
    return any(entry.type == BlockType.data3D for entry in self.entries)
var has_emg : bool

Check if the TDF file has an EMG block

Expand source code
@property
@provide_context_if_needed
def has_emg(self) -> bool:
    """Check if the TDF file has an EMG block"""
    return any(
        entry.type == BlockType.electromyographicData for entry in self.entries
    )
var has_events : bool

Check if the TDF file has an events block

Expand source code
@property
@provide_context_if_needed
def has_events(self) -> bool:
    """Check if the TDF file has an events block"""
    return any(i for i in self.entries if i.type == BlockType.temporalEventsData)
var has_force_and_torque : bool

Check if the file has a force and torque data block.

Expand source code
@property
@provide_context_if_needed
def has_force_and_torque(self) -> bool:
    """Check if the file has a force and torque data block."""
    return any(entry.type == BlockType.forceAndTorqueData for entry in self.entries)
var nBytes : int

Return the size of the TDF file in bytes

Expand source code
@property
def nBytes(self) -> int:
    """Return the size of the TDF file in bytes"""
    return self.file_path.stat().st_size

Methods

def add_block(self, newBlock: Block, comment: str = 'Generated by basicTDF') ‑> None

Adds a block to the TDF file

Args

newBlock : Block
the block to be added.
comment : str, optional
A description for the block entry.

Defaults to "Generated by basicTDF".

Raises

PermissionError
the TDF is read only
ValueError
there's already a block of the same type
ValueError
block limit reached (14 as per BTS's implementation)
IOError
unused empty blocks in the middle of the file
Expand source code
@raise_if_outside_write_context
def add_block(
    self, newBlock: Block, comment: str = "Generated by basicTDF"
) -> None:
    """Adds a block to the TDF file

    Args:
        newBlock (Block): the block to be added.
        comment (str, optional): A description for the block entry.
        Defaults to "Generated by basicTDF".

    Raises:
        PermissionError: the TDF is read only
        ValueError: there's already a block of the same type
        ValueError: block limit reached (14 as per BTS's implementation)
        IOError: unused empty blocks in the middle of the file
    """
    if self._mode == "rb":
        raise PermissionError(
            "Can't add blocks, this file was opened in read-only mode"
        )

    try:
        if self.get_block(newBlock.type):
            raise ValueError(
                (
                    f"There's already a block of this type {newBlock.type}"
                    " .Remove it first"
                )
            )
    except Exception:
        pass

    # find first unused slot
    try:
        unusedBlockPos = next(
            n for n, i in enumerate(self.entries) if i.type == BlockType.unusedSlot
        )
    except StopIteration:
        raise ValueError(f"Block limit reached ({len(self.entries)})")

    # write new entry with the offset of that unused slot
    new_entry = TdfEntry(
        type=newBlock.type,
        format=newBlock.format.value,
        offset=self.entries[unusedBlockPos].offset,
        size=newBlock.nBytes,
        creation_date=newBlock.creation_date,
        last_modification_date=newBlock.last_modification_date,
        last_access_date=datetime.now(),
        comment=comment,
    )

    # replace the entry
    self.entries[unusedBlockPos] = new_entry

    # write new entry
    self.handler.seek(64 + 288 * unusedBlockPos, 0)
    new_entry._write(self.handler)

    # update all unused slots's offset
    for n, entry in enumerate(
        self.entries[unusedBlockPos + 1 :], start=unusedBlockPos + 1
    ):
        if entry.type == BlockType.unusedSlot:
            entry.offset = new_entry.offset + new_entry.size
            self.handler.seek(64 + 288 * n, 0)
            entry._write(self.handler)
        else:
            raise IOError("All unused slots must be at the end of the file")

    # write new block
    self.handler.seek(new_entry.offset, 0)
    newBlock._write(self.handler)

    # ensure the file is the correct size
    # and that the changes are written to disk
    self.handler.flush()
def allow_write(self) ‑> Tdf

Allow writing to the file.

Expand source code
def allow_write(self) -> "Tdf":
    """Allow writing to the file."""
    self._mode = "r+b"
    return self
def get_block(self, index_or_type: Union[BlockType, int]) ‑> Optional[Type[Block]]

Get a block from the TDF file.

Expand source code
@provide_context_if_needed
def get_block(self, index_or_type: Union[BlockType, int]) -> Optional[Type[Block]]:
    """Get a block from the TDF file."""

    if isinstance(index_or_type, int):
        if 0 <= index_or_type < len(self.entries):
            entry = self.entries[index_or_type]
        else:
            raise IndexError(f"Index {index_or_type} out of range")

    elif isinstance(index_or_type, BlockType):
        entry = next((e for e in self.entries if e.type == index_or_type), None)
        if entry is None:
            raise Exception(f"Block {index_or_type} not found")

    else:
        raise TypeError(f"Expected int or BlockType, got {type(index_or_type)}")

    self.handler.seek(entry.offset, 0)
    block_class = _get_block_class(entry.type)
    return block_class._build(self.handler, entry.format)
def remove_block(self, type: Block) ‑> None

Remove a block of the given type from the file. Removing a block implies:

  • Removing the entry
  • Updating all subsequent unused slots's offset (subtracting the size of the removed block)
  • Inserting a new unused slot entry at the end (with the previous slot offset + size as offset)
  • If there is info after the block, move it block_to_remove.size up
Expand source code
@raise_if_outside_write_context
def remove_block(self, type: Block) -> None:
    """Remove a block of the given type from the file. Removing a block
    implies:

    - Removing the entry
    - Updating all subsequent unused slots's offset
      (subtracting the size of the removed block)
    - Inserting a new unused slot entry at the end
      (with the previous slot offset + size as offset)
    - If there is info after the block, move it
      block_to_remove.size up

    """
    if "+" not in self._mode:
        raise PermissionError(
            "Can't remove blocks, this file was opened in read-only mode"
        )

    # find block
    try:
        oldEntryPos, oldEntry = next(
            (n, i) for n, i in enumerate(self.entries) if i.type == type
        )
    except StopIteration:
        raise ValueError(f"No block of type {type} found")

    # calculate new offset for the next unused slot
    newOffset = (
        self.entries[-1].offset if oldEntryPos != 0 else (64 + 288 * self.nEntries)
    )

    # delete entry
    self.entries.remove(oldEntry)
    self.handler.seek(64 + 288 * oldEntryPos, 0)
    # update all the offsets of the entries preceding the removed one
    for entry in self.entries[oldEntryPos:]:
        entry.offset -= oldEntry.size
        entry._write(self.handler)

    # add new unused slot at the end
    date = datetime.now()
    newEntry = TdfEntry(
        type=BlockType.unusedSlot,
        format=0,
        offset=newOffset,
        size=0,
        creation_date=date,
        last_modification_date=date,
        last_access_date=date,
        comment="Generated by basicTDF",
    )
    self.entries.append(newEntry)
    newEntry._write(self.handler)

    self.handler.seek(oldEntry.offset + oldEntry.size, 0)
    temp = self.handler.read()
    self.handler.seek(oldEntry.offset, 0)
    self.handler.write(temp)
    self.handler.truncate()
    self.handler.flush()
def replace_block(self, newBlock: Block, comment: Optional[str] = None) ‑> None

Replace a block of the same type with a new one. This is done by removing the old block and adding the new one.

Expand source code
@raise_if_outside_write_context
def replace_block(self, newBlock: Block, comment: Optional[str] = None) -> None:
    """Replace a block of the same type with a new one. This is done by
    removing the old block and adding the new one."""

    old_entry = next((i for i in self.entries if i.type == newBlock.type), None)

    if old_entry is None:
        raise ValueError(f"No block of type {newBlock.type} found")

    comment = comment if comment is not None else old_entry.comment

    self.remove_block(newBlock.type)
    self.add_block(newBlock, comment)
class TemporalEventsData (format=TemporalEventsDataFormat.standard, start_time=0.0)

The Events data block. Stores the temporal events data.

Expand source code
class TemporalEventsData(Block):
    """
    The Events data block. Stores the temporal events data.
    """

    type = BlockType.temporalEventsData

    def __init__(self, format=TemporalEventsDataFormat.standard, start_time=0.0):
        super().__init__()
        self.format = format
        self.start_time = start_time
        self.events = []

    @staticmethod
    def _build(stream, format) -> "TemporalEventsData":
        format = TemporalEventsDataFormat(format)
        nEvents = i32.bread(stream)
        start_time = f32.bread(stream)

        t = TemporalEventsData(format, start_time)
        t.events = [Event._build(stream) for _ in range(nEvents)]

        return t

    def _write(self, stream) -> None:
        i32.bwrite(stream, len(self.events))
        f32.bwrite(stream, self.start_time)
        for event in self.events:
            event._write(stream)

    @property
    def nBytes(self) -> int:
        return 4 + 4 + sum(i.nBytes for i in self.events)

    def __len__(self) -> int:
        return len(self.events)

    def __getitem__(self, item: Union[int, str]) -> Event:
        """
        Returns the event with the given index or label.

        Args:
            item (Union[int, str]): index or label of the event

        Raises:
            KeyError: if the event with the given label is not found
            TypeError: if the key type is not int or str
        """
        if isinstance(item, int):
            return self.events[item]
        elif isinstance(item, str):
            try:
                return next(e for e in self.events if e.label == item)
            except StopIteration:
                raise KeyError(f"Event with label {item} not found")
        raise TypeError(f"Invalid key type: {type(item)}")

    def __iter__(self) -> Iterator[Event]:
        return iter(self.events)

    def __contains__(self, value: Union[Event, str]) -> bool:
        if isinstance(value, Event):
            return value in self.events
        elif isinstance(value, str):
            return any(value == event.label for event in self.events)
        raise TypeError(f"Invalid key type: {type(value)}")

    def __eq__(self, other) -> bool:
        if not isinstance(other, TemporalEventsData):
            return False
        return (
            self.format == other.format
            and self.start_time == other.start_time
            and all(e1 == e2 for e1, e2 in zip(self.events, other.events))
        )

    def __repr__(self) -> str:
        return (
            "<TemporalEventsData "
            f"format={self.format.name} "
            f"nEvents={len(self.events)} "
            f"start_time={self.start_time} "
            f"events={self.events} "
            f"size={self.nBytes}>"
        )

Ancestors

  • Block
  • basictdf.tdfBlock.Sized
  • basictdf.tdfBlock.BuildWriteable
  • abc.ABC

Class variables

var type

Instance variables

var nBytes : int

Size in bytes

Expand source code
@property
def nBytes(self) -> int:
    return 4 + 4 + sum(i.nBytes for i in self.events)