Package basictdf
basictdf is a read and write parser for the BTS Bioengineering TDF file format. This format is tipically used as storage of raw data from a BTS motion capture acquisition system (e.g. raw EMG, 2D raw marker data) but can also serve as storage for processed data (e.g. 3D reconstructed marker data, filtered EMG signals, events).
How to read a TDF file?
from basictdf import Tdf
# You can use a context manager to automatically close the file
with Tdf("path/to/file.tdf") as tdf:
data3D = tdf.data3D
# Or you can read individual blocks.
data3D = Tdf("path/to/file.tdf").data3D
How to modify a TDF file?
The propper way to modify a TDF file is through a context manager.
This way, the file is guaranteed to be closed properly.
Specifying the mode as "r+b" (read and write) is also required
using the Tdf.allow_write()
method.
You can't write to a tdf file like this.
# Don't do this. This will fail
Tdf("path/to/file.tdf").data3D = oldData3D
Use a context manager instead
# Do this instead
with Tdf("path/to/file.tdf").allow_write() as tdf:
oldData3D = tdf.data3D
# Let's add 1 to the X coordinate of the c7 marker
oldData3D["c7"].X = oldData3D["c7"].X + 1
tdf.data3D = oldData3D
# This is fine too
tdf = Tdf("path/to/file.tdf")
with tdf.allow_write() as tdf:
tdf.data3D = oldData3D
How to add a new block to a TDF file?
How to create a TDF from scratch?
The easiest way to add a new block to a TDF is actually to modify an existing
block with the same type. However, if you want to add a completely new block,
you can use any of the Block
subclasses, like
Data3D
or TemporalEventsData
.
from basictdf import Tdf, Data3D, MarkerTrack
import numpy as np
# Create an empty data3D block
data3D = Data3D(
frequency=1000,
nFrames=1000,
volume=np.array([1, 1, 1]),
translationVector=np.array([0, 0, 0]),
rotationMatrix=np.array([[1, 0, 0], [0, 1, 0], [0, 0, 1]]),
)
# Create a bogus marker track with random data
c7 = MarkerTrack("c7", np.random.rand(1000, 3))
# Add it to the data3D block
data3D.add_track(c7)
# Write it to a new TDF file
with Tdf.new("my_file.tdf").allow_write() as tdf:
tdf.data3D = data3D
# This works too
tdf = Tdf.new("my_file.tdf")
with tdf.allow_write() as tdf:
tdf.data3D = data3D
Expand source code
from __future__ import annotations
__doc__ = """
basictdf is a **read** and **write** parser for the BTS Bioengineering TDF file
format.
This format is tipically used as storage of raw data from a BTS motion capture
acquisition system (e.g. raw EMG, 2D raw marker data) but can also serve as
storage for processed data
(e.g. 3D reconstructed marker data, filtered EMG signals, events).
## How to read a TDF file?
```python
from basictdf import Tdf
# You can use a context manager to automatically close the file
with Tdf("path/to/file.tdf") as tdf:
data3D = tdf.data3D
# Or you can read individual blocks.
data3D = Tdf("path/to/file.tdf").data3D
```
## How to modify a TDF file?
The propper way to modify a TDF file is through a context manager.
This way, the file is guaranteed to be closed properly.
Specifying the mode as "r+b" (read and write) is also required
using the `basictdf.Tdf.allow_write` method.
You **can't** write to a tdf file like this.
```python
# Don't do this. This will fail
Tdf("path/to/file.tdf").data3D = oldData3D
```
Use a context manager instead
```python
# Do this instead
with Tdf("path/to/file.tdf").allow_write() as tdf:
oldData3D = tdf.data3D
# Let's add 1 to the X coordinate of the c7 marker
oldData3D["c7"].X = oldData3D["c7"].X + 1
tdf.data3D = oldData3D
# This is fine too
tdf = Tdf("path/to/file.tdf")
with tdf.allow_write() as tdf:
tdf.data3D = oldData3D
```
## How to add a new block to a TDF file?
## How to create a TDF from scratch?
The easiest way to add a new block to a TDF is actually to modify an existing
block with the same type. However, if you want to add a completely new block,
you can use any of the `basictdf.tdfBlock.Block` subclasses, like
`basictdf.tdfData3D.Data3D` or `basictdf.tdfEvents.TemporalEventsData`.
```python
from basictdf import Tdf, Data3D, MarkerTrack
import numpy as np
# Create an empty data3D block
data3D = Data3D(
frequency=1000,
nFrames=1000,
volume=np.array([1, 1, 1]),
translationVector=np.array([0, 0, 0]),
rotationMatrix=np.array([[1, 0, 0], [0, 1, 0], [0, 0, 1]]),
)
# Create a bogus marker track with random data
c7 = MarkerTrack("c7", np.random.rand(1000, 3))
# Add it to the data3D block
data3D.add_track(c7)
# Write it to a new TDF file
with Tdf.new("my_file.tdf").allow_write() as tdf:
tdf.data3D = data3D
# This works too
tdf = Tdf.new("my_file.tdf")
with tdf.allow_write() as tdf:
tdf.data3D = data3D
```
"""
from basictdf.basictdf import Tdf
from basictdf.tdfData3D import Data3D, MarkerTrack
from basictdf.tdfEMG import EMG, EMGTrack
from basictdf.tdfEvents import Event, EventsDataType, TemporalEventsData
__all__ = [
"Tdf",
"Data3D",
"MarkerTrack",
"TemporalEventsData",
"Event",
"EventsDataType",
"EMG",
"EMGTrack",
"ForcePlatformsDataBlock",
"ForcePlatformData",
]
__pdoc__ = {}
__pdoc__["basictdf.tdfUtils"] = False
__pdoc__["basictdf.tdfTypes"] = False
__pdoc__["collections.ABC"] = False
Sub-modules
basictdf.basictdf
basictdf.tdfBlock
-
Block and block type classes.
basictdf.tdfCalibrationData
-
The TDF Calibration Data block
basictdf.tdfData2D
-
Data2D module
basictdf.tdfData3D
-
Marker data module.
basictdf.tdfEMG
-
EMG data module …
basictdf.tdfEvents
-
Events data module.
basictdf.tdfForce3D
-
Force, torque and acceleration data module.
basictdf.tdfForcePlatformsCalibration
-
Force platform calibration data module.
basictdf.tdfForcePlatformsData
-
Force platform data module.
basictdf.tdfOpticalSystem
-
Optical System Configuration Data module.
Classes
class Data3D (frequency: float, nFrames: int, volume: numpy.ndarray, rotationMatrix: numpy.ndarray, translationVector: numpy.ndarray, startTime: float = 0.0, flag: Flags = Flags.rawData, format: Data3dBlockFormat = Data3dBlockFormat.byTrack)
-
A class to represent a TDF block.
A data block that contains marker tracks.
Args
frequency
:float
- data frequency in Hz
nFrames
:int
- number of frames in the data block
volume
:Union[Volume,np.ndarray]
- acquisition volume
rotationMatrix
:np.ndarray
- rotation matrix
translationVector
:np.ndarray
- translation vector
startTime
:float
, optional- Acquisition start time. Defaults to 0.0.
flag
:Flags
, optional- Data 3D block flags. Defaults to Flags.rawData.
format
:Data3dBlockFormat
, optional- Data3D format.
Defaults to Data3dBlockFormat.byTrack.
Raises
ValueError
- the volume is not an array of 3 floats
ValueError
- the rotation matrix is not an array of 3x3 floats
ValueError
- the translation vector is not an array of 3 floats
Expand source code
class Data3D(Block): type = BlockType.data3D def __init__( self, frequency: float, nFrames: int, volume: np.ndarray, rotationMatrix: np.ndarray, translationVector: np.ndarray, startTime: float = 0.0, flag: Flags = Flags.rawData, format: Data3dBlockFormat = Data3dBlockFormat.byTrack, ) -> None: """A data block that contains marker tracks. Args: frequency (float): data frequency in Hz nFrames (int): number of frames in the data block volume (Union[Volume,np.ndarray]): acquisition volume rotationMatrix (np.ndarray): rotation matrix translationVector (np.ndarray): translation vector startTime (float, optional): Acquisition start time. Defaults to 0.0. flag (Flags, optional): Data 3D block flags. Defaults to Flags.rawData. format (Data3dBlockFormat, optional): Data3D format. Defaults to Data3dBlockFormat.byTrack. Raises: ValueError: the volume is not an array of 3 floats ValueError: the rotation matrix is not an array of 3x3 floats ValueError: the translation vector is not an array of 3 floats """ super().__init__() self.format = format self.frequency = frequency self.startTime = startTime if not is_nparray_of_shape(rotationMatrix, MAT3X3F.btype.shape): raise ValueError( f"rotationMatrix must be a numpy array of shape {MAT3X3F.btype.shape}" ) self.rotationMatrix = rotationMatrix if not is_nparray_of_shape(translationVector, VEC3F.btype.shape): raise ValueError( f"translationVector must be a numpy array of shape {VEC3F.btype}" ) self.translationVector = translationVector if not is_nparray_of_shape(volume, Volume.btype.shape): raise ValueError( f"volume must be a numpy array of shape {Volume.btype.shape}" ) self.volume = volume if not isinstance(flag, Flags): raise TypeError("flag must be of type Flags") self.flag = flag if not is_integer(nFrames): raise TypeError("nFrames must be an integer") self.nFrames = nFrames self._tracks = [] def add_track(self, track: MarkerTrack) -> None: """Adds a track to the data block Args: track (MarkerTrack): track to add Raises: TypeError: Track is not of type MarkerTrack ValueError: Track has a different number of frames than the data block """ if not isinstance(track, MarkerTrack): raise TypeError("Track must be of type Track") if track.nFrames != self.nFrames: raise ValueError( ( f"Track with label {track.label} has" f" {track.nFrames} frames, expected {self.nFrames} frames" ) ) self._tracks.append(track) @property def tracks(self) -> List[MarkerTrack]: """Returns a list of all tracks in the data block Returns: List[MarkerTrack]: list of all tracks in the data block """ return self._tracks @tracks.setter def tracks(self, values: Iterable[MarkerTrack]) -> None: """ Sets the tracks in the data block. """ oldTracks = self._tracks self._tracks = [] try: for value in values: self.add_track(value) except Exception as e: self._tracks = oldTracks raise e @staticmethod def _build(stream, format) -> "Data3D": format = Data3dBlockFormat(format) nFrames = i32.bread(stream) frequency = i32.bread(stream) startTime = f32.bread(stream) nTracks = u32.bread(stream) volume = Volume.bread(stream) rotationMatrix = MAT3X3F.bread(stream) translationVector = VEC3F.bread(stream) flag = Flags(u32.bread(stream)) d = Data3D( frequency, nFrames, volume, rotationMatrix, translationVector, startTime, flag, format, ) if format in [Data3dBlockFormat.byTrack, Data3dBlockFormat.byFrame]: nLinks = i32.bread(stream) i32.skip(stream, 1) d.links = LinkType.bread(stream, nLinks) if format in [ Data3dBlockFormat.byTrack, Data3dBlockFormat.byTrackWithoutLinks, ]: d._tracks = [MarkerTrack._build(stream, nFrames) for _ in range(nTracks)] else: raise NotImplementedError(f"Data3D format {format} not implemented yet") return d def __getitem__(self, key: Union[int, str]) -> MarkerTrack: if isinstance(key, int): return self._tracks[key] elif isinstance(key, str): try: return next(track for track in self._tracks if track.label == key) except StopIteration: raise KeyError(f"Track with label {key} not found") raise TypeError(f"Invalid key type {type(key)}") def __iter__(self) -> Iterator[MarkerTrack]: return iter(self._tracks) def __len__(self) -> int: return len(self._tracks) def __eq__(self, other) -> bool: buff1 = BytesIO() buff2 = BytesIO() self._write(buff1) other._write(buff2) return buff1.getvalue() == buff2.getvalue() def __contains__(self, value: Union[MarkerTrack, str]) -> bool: if isinstance(value, MarkerTrack): return value in self._tracks elif isinstance(value, str): return any(track.label == value for track in self._tracks) raise TypeError(f"Invalid value type {type(value)}") @property def nTracks(self) -> int: """Number of tracks in the data block Returns: int: number of tracks """ return len(self._tracks) def _write(self, file: BinaryIO) -> None: if self.format not in [ Data3dBlockFormat.byTrack, Data3dBlockFormat.byTrackWithoutLinks, ]: raise NotImplementedError( f"Data3D format {self.format} not implemented yet" ) # nFrames i32.bwrite(file, self.nFrames) # frequency i32.bwrite(file, self.frequency) # startTime f32.bwrite(file, self.startTime) # nTracks u32.bwrite(file, len(self._tracks)) # volume Volume.bwrite(file, self.volume) # rotationMatrix MAT3X3F.bwrite(file, self.rotationMatrix) # translationVector VEC3F.bwrite(file, self.translationVector) # flags u32.bwrite(file, self.flag.value) if self.format in [ Data3dBlockFormat.byFrame, Data3dBlockFormat.byTrack, ]: links = self.links if hasattr(self, "links") else [] nLinks = len(links) # nLinks i32.bwrite(file, nLinks) # padding i32.bpad(file) # links LinkType.bwrite(file, links) for track in self._tracks: track._write(file) @property def nBytes(self) -> int: base = ( 4 # nFrames + 4 # frequency + 4 # startTime + 4 # nTracks + Volume.btype.itemsize # volume + MAT3X3F.btype.itemsize # rotationMatrix + VEC3F.btype.itemsize # translationVector + 4 # flags ) if self.format in [ Data3dBlockFormat.byFrame, Data3dBlockFormat.byTrack, ]: links_size = ( 4 + 4 + ( LinkType.btype.itemsize * len(self.links) if hasattr(self, "links") else 0 ) ) base += links_size for track in self._tracks: base += track.nBytes return base def __repr__(self) -> str: return ( f"<Data3D " f"format={self.format.name}, " f"nFrames={self.nFrames}, " f"frequency={self.frequency}, " f"startTime={self.startTime}, " f"nTracks={self.nTracks}, " f"tracks={[i.label for i in self._tracks]}>" )
Ancestors
- Block
- basictdf.tdfBlock.Sized
- basictdf.tdfBlock.BuildWriteable
- abc.ABC
Class variables
var type
Instance variables
var nBytes : int
-
Size in bytes
Expand source code
@property def nBytes(self) -> int: base = ( 4 # nFrames + 4 # frequency + 4 # startTime + 4 # nTracks + Volume.btype.itemsize # volume + MAT3X3F.btype.itemsize # rotationMatrix + VEC3F.btype.itemsize # translationVector + 4 # flags ) if self.format in [ Data3dBlockFormat.byFrame, Data3dBlockFormat.byTrack, ]: links_size = ( 4 + 4 + ( LinkType.btype.itemsize * len(self.links) if hasattr(self, "links") else 0 ) ) base += links_size for track in self._tracks: base += track.nBytes return base
var nTracks : int
-
Number of tracks in the data block
Returns
int
- number of tracks
Expand source code
@property def nTracks(self) -> int: """Number of tracks in the data block Returns: int: number of tracks """ return len(self._tracks)
var tracks : List[MarkerTrack]
-
Returns a list of all tracks in the data block
Returns
List[MarkerTrack]
- list of all tracks in the data block
Expand source code
@property def tracks(self) -> List[MarkerTrack]: """Returns a list of all tracks in the data block Returns: List[MarkerTrack]: list of all tracks in the data block """ return self._tracks
Methods
def add_track(self, track: MarkerTrack) ‑> None
-
Adds a track to the data block
Args
track
:MarkerTrack
- track to add
Raises
TypeError
- Track is not of type MarkerTrack
ValueError
- Track has a different number of frames than the data block
Expand source code
def add_track(self, track: MarkerTrack) -> None: """Adds a track to the data block Args: track (MarkerTrack): track to add Raises: TypeError: Track is not of type MarkerTrack ValueError: Track has a different number of frames than the data block """ if not isinstance(track, MarkerTrack): raise TypeError("Track must be of type Track") if track.nFrames != self.nFrames: raise ValueError( ( f"Track with label {track.label} has" f" {track.nFrames} frames, expected {self.nFrames} frames" ) ) self._tracks.append(track)
class EMG (frequency, nSamples, startTime=0.0, format=EMGBlockFormat.byTrack)
-
Electromyography data block
Expand source code
class EMG(Block): """Electromyography data block""" type = BlockType.electromyographicData def __init__( self, frequency, nSamples, startTime=0.0, format=EMGBlockFormat.byTrack ) -> None: super().__init__() self.frequency = frequency self.startTime = startTime self.nSamples = nSamples self._signals = [] self._emgMap = [] self.format = format @staticmethod def _build(stream, format) -> "EMG": format = EMGBlockFormat(format) nSignals = i32.bread(stream) frequency = i32.bread(stream) startTime = f32.bread(stream) nSamples = i32.bread(stream) + 49 # Why 49??? Whyyyy???? emgMap = i16.bread(stream, n=nSignals) d = EMG(frequency, nSamples, startTime, format) if format == EMGBlockFormat.byTrack: for n in range(nSignals): emgSignal = EMGTrack._build(stream, nSamples) d.addSignal(emgSignal, channel=emgMap[n]) else: raise NotImplementedError(f"EMG format {format} not implemented yet") return d def _write(self, file) -> None: if self.format != EMGBlockFormat.byTrack: raise NotImplementedError(f"EMG format {self.format} not implemented yet") # nSignals i32.bwrite(file, len(self._signals)) # frequency i32.bwrite(file, self.frequency) # startTime f32.bwrite(file, self.startTime) # nSamples i32.bwrite(file, self.nSamples - 49) # That 49 again # emgMap i16.bwrite(file, self._emgMap) # signals for signal in self._signals: signal._write(file) def __getitem__(self, key) -> EMGTrack: """ Returns the EMG signal specified by its label or its channel Args: key (int or str): The channel or the label of the signal Raises: TypeError: If the key is not an int or a str KeyError: If the signal is not found """ if isinstance(key, int): return self._signals[key] elif isinstance(key, str): try: return next(signal for signal in self._signals if signal.label == key) except StopIteration: raise KeyError(f"EMG signal with label {key} not found") raise TypeError(f"Invalid key type {type(key)}") def __contains__(self, value: Union[EMGTrack, str]) -> bool: if isinstance(value, str): return any(signal.label == value for signal in self._signals) elif isinstance(value, EMGTrack): return value in self._signals raise TypeError(f"Invalid value type {type(value)}") def __iter__(self) -> Iterator[EMGTrack]: return iter(self._signals) def __len__(self) -> int: return len(self._signals) def __eq__(self, other) -> bool: if not isinstance(other, EMG): return False return ( self.frequency == other.frequency and self.startTime == other.startTime and self.nSamples == other.nSamples and all(s1 == s2 for s1, s2 in zip(self._signals, other._signals)) ) def addSignal(self, signal: EMGTrack, channel=None) -> None: """ Adds a signal to the EMG block. If the channel is not specified, it is set to the next one available Args: signal (EMGTrack): The signal to add channel (int, optional): The channel to use. Defaults to None. Raises: TypeError: If the signal is not an EMGTrack ValueError: If the signal has a different number of samples ValueError: If the channel is already in use """ if not isinstance(signal, EMGTrack): raise TypeError(f"Can only add EMGTrack objects, got {type(signal)}") if signal.nSamples != self.nSamples: raise ValueError( ( f"EMGTrack with label {signal.label} has {signal.nSamples} " f"samples, expected {self.nSamples}" ) ) if channel is None: if len(self._emgMap) == 0: next_channel = 0 else: next_channel = max(self._emgMap) + 1 self._emgMap.append(next_channel) else: if channel in self._emgMap: raise ValueError(f"Channel {channel} already in use") self._emgMap.append(channel) self._signals.append(signal) def removeSignal(self, label: str) -> None: """ Removes a signal specified by its label from the EMG block Args: label (str): The label of the signal to remove Raises: KeyError: If the signal is not found """ try: pos = next(i for i, v in enumerate(self._signals) if v == label) except StopIteration: raise KeyError(f"EMG signal with label {label} not found") del self._signals[pos] del self._emgMap[pos] @property def nBytes(self) -> int: base = 4 + 4 + 4 + 2 * len(self._signals) + 4 for signal in self._signals: base += signal.nBytes return base @property def nSignals(self) -> int: return len(self._signals) def __repr__(self) -> str: return ( "<EMGBlock" f" format={self.format.name}" f" frequency={self.frequency}" f" nSamples={self.nSamples}" f" nSignals={self.nSignals}" f" startTime={self.startTime}" ">" )
Ancestors
- Block
- basictdf.tdfBlock.Sized
- basictdf.tdfBlock.BuildWriteable
- abc.ABC
Class variables
var type
Instance variables
var nBytes : int
-
Size in bytes
Expand source code
@property def nBytes(self) -> int: base = 4 + 4 + 4 + 2 * len(self._signals) + 4 for signal in self._signals: base += signal.nBytes return base
var nSignals : int
-
Expand source code
@property def nSignals(self) -> int: return len(self._signals)
Methods
def addSignal(self, signal: EMGTrack, channel=None) ‑> None
-
Adds a signal to the EMG block. If the channel is not specified, it is set to the next one available
Args
signal
:EMGTrack
- The signal to add
channel
:int
, optional- The channel to use. Defaults to None.
Raises
TypeError
- If the signal is not an EMGTrack
ValueError
- If the signal has a different number of samples
ValueError
- If the channel is already in use
Expand source code
def addSignal(self, signal: EMGTrack, channel=None) -> None: """ Adds a signal to the EMG block. If the channel is not specified, it is set to the next one available Args: signal (EMGTrack): The signal to add channel (int, optional): The channel to use. Defaults to None. Raises: TypeError: If the signal is not an EMGTrack ValueError: If the signal has a different number of samples ValueError: If the channel is already in use """ if not isinstance(signal, EMGTrack): raise TypeError(f"Can only add EMGTrack objects, got {type(signal)}") if signal.nSamples != self.nSamples: raise ValueError( ( f"EMGTrack with label {signal.label} has {signal.nSamples} " f"samples, expected {self.nSamples}" ) ) if channel is None: if len(self._emgMap) == 0: next_channel = 0 else: next_channel = max(self._emgMap) + 1 self._emgMap.append(next_channel) else: if channel in self._emgMap: raise ValueError(f"Channel {channel} already in use") self._emgMap.append(channel) self._signals.append(signal)
def removeSignal(self, label: str) ‑> None
-
Removes a signal specified by its label from the EMG block
Args
label
:str
- The label of the signal to remove
Raises
KeyError
- If the signal is not found
Expand source code
def removeSignal(self, label: str) -> None: """ Removes a signal specified by its label from the EMG block Args: label (str): The label of the signal to remove Raises: KeyError: If the signal is not found """ try: pos = next(i for i, v in enumerate(self._signals) if v == label) except StopIteration: raise KeyError(f"EMG signal with label {label} not found") del self._signals[pos] del self._emgMap[pos]
class EMGTrack (label: str, trackData: numpy.ndarray)
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class EMGTrack(Sized, BuildWriteable): def __init__(self, label: str, trackData: np.ndarray) -> None: self.label = label self.data = trackData @property def nSamples(self) -> int: """ Returns: int: number of samples of the track """ return self.data.shape[0] @property def _segments(self): maskedTrackData = np.ma.masked_invalid(self.data) return np.ma.clump_unmasked(maskedTrackData.T) @staticmethod def _build(stream, nSamples) -> "EMGTrack": label = BTSString.bread(stream, 256) nSegments = i32.bread(stream) i32.skip(stream) # padding segmentData = SegmentData.bread(stream, nSegments) trackData = np.empty(nSamples, dtype="<f4") trackData[:] = np.nan for startFrame, nFrames in segmentData: trackData[startFrame : startFrame + nFrames] = f32.bread(stream, nFrames) return EMGTrack(label, trackData) def _write(self, file) -> None: # label BTSString.bwrite(file, 256, self.label) segments = self._segments # nSegments i32.bwrite(file, len(segments)) # padding i32.bpad(file, 1) for segment in segments: # startFrame i32.bwrite(file, segment.start) # nFrames i32.bwrite(file, segment.stop - segment.start) for segment in segments: # data f32.bwrite(file, self.data[segment]) @property def nBytes(self): base = 256 + 4 + 4 for segment in self._segments: base += 4 + 4 + (segment.stop - segment.start) * f32.btype.itemsize return base def __eq__(self, other): return self.label == other.label and np.all(self.data == other.data) def __repr__(self) -> str: return ( f"EMGTrack(label={self.label}, nSamples={self.nSamples}," f"segments={len(self._segments)})" )
Ancestors
- basictdf.tdfBlock.Sized
- basictdf.tdfBlock.BuildWriteable
- abc.ABC
Instance variables
var nBytes
-
Size in bytes
Expand source code
@property def nBytes(self): base = 256 + 4 + 4 for segment in self._segments: base += 4 + 4 + (segment.stop - segment.start) * f32.btype.itemsize return base
var nSamples : int
-
Returns
int
- number of samples of the track
Expand source code
@property def nSamples(self) -> int: """ Returns: int: number of samples of the track """ return self.data.shape[0]
class ForcePlatformsDataBlock (label: str, trackData: numpy.ndarray)
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class EMGTrack(Sized, BuildWriteable): def __init__(self, label: str, trackData: np.ndarray) -> None: self.label = label self.data = trackData @property def nSamples(self) -> int: """ Returns: int: number of samples of the track """ return self.data.shape[0] @property def _segments(self): maskedTrackData = np.ma.masked_invalid(self.data) return np.ma.clump_unmasked(maskedTrackData.T) @staticmethod def _build(stream, nSamples) -> "EMGTrack": label = BTSString.bread(stream, 256) nSegments = i32.bread(stream) i32.skip(stream) # padding segmentData = SegmentData.bread(stream, nSegments) trackData = np.empty(nSamples, dtype="<f4") trackData[:] = np.nan for startFrame, nFrames in segmentData: trackData[startFrame : startFrame + nFrames] = f32.bread(stream, nFrames) return EMGTrack(label, trackData) def _write(self, file) -> None: # label BTSString.bwrite(file, 256, self.label) segments = self._segments # nSegments i32.bwrite(file, len(segments)) # padding i32.bpad(file, 1) for segment in segments: # startFrame i32.bwrite(file, segment.start) # nFrames i32.bwrite(file, segment.stop - segment.start) for segment in segments: # data f32.bwrite(file, self.data[segment]) @property def nBytes(self): base = 256 + 4 + 4 for segment in self._segments: base += 4 + 4 + (segment.stop - segment.start) * f32.btype.itemsize return base def __eq__(self, other): return self.label == other.label and np.all(self.data == other.data) def __repr__(self) -> str: return ( f"EMGTrack(label={self.label}, nSamples={self.nSamples}," f"segments={len(self._segments)})" )
Ancestors
- basictdf.tdfBlock.Sized
- basictdf.tdfBlock.BuildWriteable
- abc.ABC
Instance variables
var nBytes
-
Size in bytes
Expand source code
@property def nBytes(self): base = 256 + 4 + 4 for segment in self._segments: base += 4 + 4 + (segment.stop - segment.start) * f32.btype.itemsize return base
var nSamples : int
-
Returns
int
- number of samples of the track
Expand source code
@property def nSamples(self) -> int: """ Returns: int: number of samples of the track """ return self.data.shape[0]
class ForcePlatformData (label: str, trackData: numpy.ndarray)
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class EMGTrack(Sized, BuildWriteable): def __init__(self, label: str, trackData: np.ndarray) -> None: self.label = label self.data = trackData @property def nSamples(self) -> int: """ Returns: int: number of samples of the track """ return self.data.shape[0] @property def _segments(self): maskedTrackData = np.ma.masked_invalid(self.data) return np.ma.clump_unmasked(maskedTrackData.T) @staticmethod def _build(stream, nSamples) -> "EMGTrack": label = BTSString.bread(stream, 256) nSegments = i32.bread(stream) i32.skip(stream) # padding segmentData = SegmentData.bread(stream, nSegments) trackData = np.empty(nSamples, dtype="<f4") trackData[:] = np.nan for startFrame, nFrames in segmentData: trackData[startFrame : startFrame + nFrames] = f32.bread(stream, nFrames) return EMGTrack(label, trackData) def _write(self, file) -> None: # label BTSString.bwrite(file, 256, self.label) segments = self._segments # nSegments i32.bwrite(file, len(segments)) # padding i32.bpad(file, 1) for segment in segments: # startFrame i32.bwrite(file, segment.start) # nFrames i32.bwrite(file, segment.stop - segment.start) for segment in segments: # data f32.bwrite(file, self.data[segment]) @property def nBytes(self): base = 256 + 4 + 4 for segment in self._segments: base += 4 + 4 + (segment.stop - segment.start) * f32.btype.itemsize return base def __eq__(self, other): return self.label == other.label and np.all(self.data == other.data) def __repr__(self) -> str: return ( f"EMGTrack(label={self.label}, nSamples={self.nSamples}," f"segments={len(self._segments)})" )
Ancestors
- basictdf.tdfBlock.Sized
- basictdf.tdfBlock.BuildWriteable
- abc.ABC
Instance variables
var nBytes
-
Size in bytes
Expand source code
@property def nBytes(self): base = 256 + 4 + 4 for segment in self._segments: base += 4 + 4 + (segment.stop - segment.start) * f32.btype.itemsize return base
var nSamples : int
-
Returns
int
- number of samples of the track
Expand source code
@property def nSamples(self) -> int: """ Returns: int: number of samples of the track """ return self.data.shape[0]
class Event (label, values=[], type=EventsDataType.singleEvent)
-
A class representing a single event or a sequence of events.
Expand source code
class Event(Sized, BuildWriteable): """ A class representing a single event or a sequence of events. """ def __init__(self, label, values=[], type=EventsDataType.singleEvent) -> None: self.label = label self.type = type if not is_iterable(values): raise TypeError("Values must be iterable") if isinstance(values, np.ndarray) and values.dtype == np.dtype("<f4"): self.values = values else: self.values = np.array(values, dtype="<f4") if len(values) > 1 and type == EventsDataType.singleEvent: raise TypeError("Can't have more than one value for a single event") def _write(self, stream) -> None: BTSString.bwrite(stream, 256, self.label) u32.bwrite(stream, self.type.value) # type u32.bwrite(stream, len(self.values)) # nItems f32.bwrite(stream, self.values) @staticmethod def _build(stream) -> "Event": label = BTSString.bread(stream, 256) type_ = EventsDataType(u32.bread(stream)) nItems = i32.bread(stream) values = np.array([f32.bread(stream) for _ in range(nItems)]) return Event(label, values, type_) def __len__(self) -> int: return len(self.values) def __eq__(self, o: object) -> bool: if not isinstance(o, Event): return False return ( self.label == o.label and self.type == o.type and np.array_equal(self.values, o.values) ) @property def nBytes(self) -> int: return 256 + 4 + 4 + len(self.values) * 4 def __repr__(self) -> str: return ( "<Event " f"label={self.label} " f"type={self.type} " f"nItems={len(self.values)} " f"values={self.values}>" )
Ancestors
- basictdf.tdfBlock.Sized
- basictdf.tdfBlock.BuildWriteable
- abc.ABC
Instance variables
var nBytes : int
-
Size in bytes
Expand source code
@property def nBytes(self) -> int: return 256 + 4 + 4 + len(self.values) * 4
class EventsDataType (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
An enumeration.
Expand source code
class EventsDataType(Enum): singleEvent = 0 eventSequence = 1
Ancestors
- enum.Enum
Class variables
var eventSequence
var singleEvent
class MarkerTrack (label: str, track_data: numpy.ndarray)
-
A track that collects all the data of a physical marker, such as name and position.
Expand source code
class MarkerTrack(Sized, BuildWriteable): """ A track that collects all the data of a physical marker, such as name and position. """ def __init__(self, label: str, track_data: np.ndarray) -> None: self.label = label "The name of the marker" self.data = track_data "The actual marker data" @property def X(self) -> np.ndarray: """ Convenience property that returns or sets the X component of the marker position. """ return self.data[:, 0] @property def Y(self) -> np.ndarray: """ Convenience property that returns or sets the Y component of the marker position. """ return self.data[:, 1] @property def Z(self) -> np.ndarray: """ Convenience property that returns or sets the Z component of the marker position. """ return self.data[:, 2] @X.setter def X(self, value) -> None: self.data[:, 0] = value @Y.setter def Y(self, value) -> None: self.data[:, 1] = value @Z.setter def Z(self, value) -> None: self.data[:, 2] = value @property def nFrames(self) -> int: """ Returns: int: number of frames in the track """ return self.data.shape[0] @property def _segments(self): maskedTrackData = np.ma.masked_invalid(self.data) return np.ma.clump_unmasked(maskedTrackData.T[0]) @staticmethod def _build(stream, nFrames: int) -> "MarkerTrack": trackData = np.empty(nFrames, dtype=TrackType.btype) trackData[:] = np.NaN label = BTSString.bread(stream, 256) nSegments = i32.bread(stream) i32.skip(stream) segmentData = SegmentData.bread(stream, nSegments) for startFrame, nFrames in segmentData: dat = TrackType.bread(stream, nFrames) trackData[startFrame : startFrame + nFrames] = dat return MarkerTrack(label, trackData) def _write(self, file) -> None: # label BTSString.bwrite(file, 256, self.label) segments = self._segments # nSegments i32.bwrite(file, len(segments)) # padding i32.bpad(file, 1) for segment in segments: # startFrame i32.bwrite(file, np.array(segment.start)) # nFrames i32.bwrite(file, np.array(segment.stop - segment.start)) for segment in segments: # trackData TrackType.bwrite(file, self.data[segment]) @property def nBytes(self) -> int: """ Returns: int: size of the track in bytes """ base = 256 + 4 + 4 for segment in self._segments: base += 4 + 4 + (segment.stop - segment.start) * TrackType.btype.itemsize return base def __repr__(self) -> str: return f"Track(label={self.label}, nFrames={self.nFrames})" def __eq__(self, other): return self.label == other.label and np.all(self.data == other.data)
Ancestors
- basictdf.tdfBlock.Sized
- basictdf.tdfBlock.BuildWriteable
- abc.ABC
Instance variables
var X : numpy.ndarray
-
Convenience property that returns or sets the X component of the marker position.
Expand source code
@property def X(self) -> np.ndarray: """ Convenience property that returns or sets the X component of the marker position. """ return self.data[:, 0]
var Y : numpy.ndarray
-
Convenience property that returns or sets the Y component of the marker position.
Expand source code
@property def Y(self) -> np.ndarray: """ Convenience property that returns or sets the Y component of the marker position. """ return self.data[:, 1]
var Z : numpy.ndarray
-
Convenience property that returns or sets the Z component of the marker position.
Expand source code
@property def Z(self) -> np.ndarray: """ Convenience property that returns or sets the Z component of the marker position. """ return self.data[:, 2]
var data
-
The actual marker data
var label
-
The name of the marker
var nBytes : int
-
Returns
int
- size of the track in bytes
Expand source code
@property def nBytes(self) -> int: """ Returns: int: size of the track in bytes """ base = 256 + 4 + 4 for segment in self._segments: base += 4 + 4 + (segment.stop - segment.start) * TrackType.btype.itemsize return base
var nFrames : int
-
Returns
int
- number of frames in the track
Expand source code
@property def nFrames(self) -> int: """ Returns: int: number of frames in the track """ return self.data.shape[0]
class Tdf (filename: Union[pathlib.Path, str])
-
Expand source code
class Tdf: SIGNATURE = b"\x82K`A\xd3\x11\x84\xca`\x00\xb6\xac\x16h\x0c\x08" def __init__(self, filename: Union[Path, str]) -> None: self.file_path = Path(filename) if not self.file_path.exists(): raise FileNotFoundError(f"File {self.file_path} not found") self._mode = "rb" self._inside_context = False def allow_write(self) -> "Tdf": """Allow writing to the file.""" self._mode = "r+b" return self def __enter__(self) -> "Tdf": self._inside_context = True self.handler: IO[bytes] = self.file_path.open(self._mode) self.signature = self.handler.read(len(self.SIGNATURE)) if self.signature != self.SIGNATURE: raise Exception("Invalid TDF file") self.version = u32.bread(self.handler) self.nEntries = i32.bread(self.handler) # pad 8 bytes i32.skip(self.handler, 2) self.creation_date = BTSDate.bread(self.handler) self.last_modification_date = BTSDate.bread(self.handler) self.last_access_date = BTSDate.bread(self.handler) # pad 20 bytes i32.skip(self.handler, 5) self.entries = [TdfEntry._build(self.handler) for _ in range(self.nEntries)] "List of entries in the file" return self def __exit__(self, exc_type, exc_val, exc_tb) -> None: self._inside_context = False self._mode = "rb" self.handler.close() @property @provide_context_if_needed def blocks(self) -> List[Block]: """Get all blocks in the file.""" return [self.get_block(entry.type) for entry in self.entries] @provide_context_if_needed def get_block(self, index_or_type: Union[BlockType, int]) -> Optional[Type[Block]]: """Get a block from the TDF file.""" if isinstance(index_or_type, int): if 0 <= index_or_type < len(self.entries): entry = self.entries[index_or_type] else: raise IndexError(f"Index {index_or_type} out of range") elif isinstance(index_or_type, BlockType): entry = next((e for e in self.entries if e.type == index_or_type), None) if entry is None: raise Exception(f"Block {index_or_type} not found") else: raise TypeError(f"Expected int or BlockType, got {type(index_or_type)}") self.handler.seek(entry.offset, 0) block_class = _get_block_class(entry.type) return block_class._build(self.handler, entry.format) def __getitem__( self, index_or_type: Union[BlockType, int] ) -> Optional[Type[Block]]: return self.get_block(index_or_type) @property @provide_context_if_needed def data3D(self) -> Optional[Data3D]: """ Convenience property to get/set the 3D data block. """ return self.get_block(BlockType.data3D) @data3D.setter @raise_if_outside_context def data3D(self, data: Data3D) -> None: self.replace_block(data) if self.has_data3D else self.add_block(data) @property @provide_context_if_needed def has_data3D(self) -> bool: """Check if the file has a 3D data block.""" return any(entry.type == BlockType.data3D for entry in self.entries) @property @provide_context_if_needed def force_and_torque(self) -> Optional[ForceTorque3D]: """Convenience property to get/set the force and torque data block.""" return self.get_block(BlockType.forceAndTorqueData) @force_and_torque.setter @raise_if_outside_write_context def force_and_torque(self, data: ForceTorque3D) -> None: self.replace_block(data) if self.has_force_and_torque else self.add_block(data) @property @provide_context_if_needed def has_force_and_torque(self) -> bool: """Check if the file has a force and torque data block.""" return any(entry.type == BlockType.forceAndTorqueData for entry in self.entries) @property @provide_context_if_needed def events(self) -> Optional[TemporalEventsData]: """Convenience property to get/set the events data block.""" return self.get_block(BlockType.temporalEventsData) @events.setter @raise_if_outside_write_context def events(self, data: TemporalEventsData) -> None: self.replace_block(data) if self.has_events else self.add_block(data) @property @provide_context_if_needed def has_events(self) -> bool: """Check if the TDF file has an events block""" return any(i for i in self.entries if i.type == BlockType.temporalEventsData) @property @provide_context_if_needed def emg(self) -> Optional[EMG]: """Convenience property to get/set the EMG data block.""" return self.get_block(EMG.type) @emg.setter @raise_if_outside_write_context def emg(self, data: EMG) -> None: self.replace_block(data) if self.has_emg else self.add_block(data) @property @provide_context_if_needed def has_emg(self) -> bool: """Check if the TDF file has an EMG block""" return any( entry.type == BlockType.electromyographicData for entry in self.entries ) @property @provide_context_if_needed def calibrationData(self) -> Optional[CalibrationDataBlock]: """Convenience property to get/set the calibration data block.""" return self.get_block(CalibrationDataBlock.type) @raise_if_outside_write_context def add_block( self, newBlock: Block, comment: str = "Generated by basicTDF" ) -> None: """Adds a block to the TDF file Args: newBlock (Block): the block to be added. comment (str, optional): A description for the block entry. Defaults to "Generated by basicTDF". Raises: PermissionError: the TDF is read only ValueError: there's already a block of the same type ValueError: block limit reached (14 as per BTS's implementation) IOError: unused empty blocks in the middle of the file """ if self._mode == "rb": raise PermissionError( "Can't add blocks, this file was opened in read-only mode" ) try: if self.get_block(newBlock.type): raise ValueError( ( f"There's already a block of this type {newBlock.type}" " .Remove it first" ) ) except Exception: pass # find first unused slot try: unusedBlockPos = next( n for n, i in enumerate(self.entries) if i.type == BlockType.unusedSlot ) except StopIteration: raise ValueError(f"Block limit reached ({len(self.entries)})") # write new entry with the offset of that unused slot new_entry = TdfEntry( type=newBlock.type, format=newBlock.format.value, offset=self.entries[unusedBlockPos].offset, size=newBlock.nBytes, creation_date=newBlock.creation_date, last_modification_date=newBlock.last_modification_date, last_access_date=datetime.now(), comment=comment, ) # replace the entry self.entries[unusedBlockPos] = new_entry # write new entry self.handler.seek(64 + 288 * unusedBlockPos, 0) new_entry._write(self.handler) # update all unused slots's offset for n, entry in enumerate( self.entries[unusedBlockPos + 1 :], start=unusedBlockPos + 1 ): if entry.type == BlockType.unusedSlot: entry.offset = new_entry.offset + new_entry.size self.handler.seek(64 + 288 * n, 0) entry._write(self.handler) else: raise IOError("All unused slots must be at the end of the file") # write new block self.handler.seek(new_entry.offset, 0) newBlock._write(self.handler) # ensure the file is the correct size # and that the changes are written to disk self.handler.flush() @raise_if_outside_write_context def remove_block(self, type: Block) -> None: """Remove a block of the given type from the file. Removing a block implies: - Removing the entry - Updating all subsequent unused slots's offset (subtracting the size of the removed block) - Inserting a new unused slot entry at the end (with the previous slot offset + size as offset) - If there is info after the block, move it block_to_remove.size up """ if "+" not in self._mode: raise PermissionError( "Can't remove blocks, this file was opened in read-only mode" ) # find block try: oldEntryPos, oldEntry = next( (n, i) for n, i in enumerate(self.entries) if i.type == type ) except StopIteration: raise ValueError(f"No block of type {type} found") # calculate new offset for the next unused slot newOffset = ( self.entries[-1].offset if oldEntryPos != 0 else (64 + 288 * self.nEntries) ) # delete entry self.entries.remove(oldEntry) self.handler.seek(64 + 288 * oldEntryPos, 0) # update all the offsets of the entries preceding the removed one for entry in self.entries[oldEntryPos:]: entry.offset -= oldEntry.size entry._write(self.handler) # add new unused slot at the end date = datetime.now() newEntry = TdfEntry( type=BlockType.unusedSlot, format=0, offset=newOffset, size=0, creation_date=date, last_modification_date=date, last_access_date=date, comment="Generated by basicTDF", ) self.entries.append(newEntry) newEntry._write(self.handler) self.handler.seek(oldEntry.offset + oldEntry.size, 0) temp = self.handler.read() self.handler.seek(oldEntry.offset, 0) self.handler.write(temp) self.handler.truncate() self.handler.flush() @staticmethod def new(filename: str) -> "Tdf": """Creates a new TDF file""" filePath = Path(filename) if filePath.exists(): raise FileExistsError("File already exists") nEntries = 14 date = datetime.now() with filePath.open("wb") as f: # signature f.write(Tdf.SIGNATURE) # version i32.bwrite(f, 1) # nEntries i32.bwrite(f, nEntries) # reserved i32.bpad(f, 2) # creation date BTSDate.bwrite(f, date) # last modification date BTSDate.bwrite(f, date) # last access date BTSDate.bwrite(f, date) # reserved i32.bpad(f, 5) # start entries entryOffset = 64 # all entries start with offset to the where entries stop blockOffset = entryOffset + nEntries * 288 for _ in range(nEntries): # type u32.bwrite(f, 0) # format u32.bwrite(f, 0) # offset i32.bwrite(f, blockOffset) # size i32.bwrite(f, 0) # creation date BTSDate.bwrite(f, date) # last modification date BTSDate.bwrite(f, date) # last access date BTSDate.bwrite(f, date) # reserved i32.bpad(f, 1) # comment BTSString.bwrite(f, 256, "Generated by basicTDF") return Tdf(filePath) @raise_if_outside_write_context def replace_block(self, newBlock: Block, comment: Optional[str] = None) -> None: """Replace a block of the same type with a new one. This is done by removing the old block and adding the new one.""" old_entry = next((i for i in self.entries if i.type == newBlock.type), None) if old_entry is None: raise ValueError(f"No block of type {newBlock.type} found") comment = comment if comment is not None else old_entry.comment self.remove_block(newBlock.type) self.add_block(newBlock, comment) @property def nBytes(self) -> int: """Return the size of the TDF file in bytes""" return self.file_path.stat().st_size def __len__(self) -> int: """Return the number of blocks in the TDF file that are not of type unusedSlot """ return sum(1 for i in self.entries if i.type != BlockType.unusedSlot) def __eq__(self, o: object) -> bool: if not isinstance(o, Tdf): return False return ( self.version == o.version and self.nEntries == o.nEntries and self.blocks == o.blocks ) @provide_context_if_needed def __repr__(self) -> str: return f"Tdf({self.file_path}, nEntries={self.nEntries}, nBytes={self.nBytes}"
Class variables
var SIGNATURE
Static methods
def new(filename: str) ‑> Tdf
-
Creates a new TDF file
Expand source code
@staticmethod def new(filename: str) -> "Tdf": """Creates a new TDF file""" filePath = Path(filename) if filePath.exists(): raise FileExistsError("File already exists") nEntries = 14 date = datetime.now() with filePath.open("wb") as f: # signature f.write(Tdf.SIGNATURE) # version i32.bwrite(f, 1) # nEntries i32.bwrite(f, nEntries) # reserved i32.bpad(f, 2) # creation date BTSDate.bwrite(f, date) # last modification date BTSDate.bwrite(f, date) # last access date BTSDate.bwrite(f, date) # reserved i32.bpad(f, 5) # start entries entryOffset = 64 # all entries start with offset to the where entries stop blockOffset = entryOffset + nEntries * 288 for _ in range(nEntries): # type u32.bwrite(f, 0) # format u32.bwrite(f, 0) # offset i32.bwrite(f, blockOffset) # size i32.bwrite(f, 0) # creation date BTSDate.bwrite(f, date) # last modification date BTSDate.bwrite(f, date) # last access date BTSDate.bwrite(f, date) # reserved i32.bpad(f, 1) # comment BTSString.bwrite(f, 256, "Generated by basicTDF") return Tdf(filePath)
Instance variables
var blocks : List[Block]
-
Get all blocks in the file.
Expand source code
@property @provide_context_if_needed def blocks(self) -> List[Block]: """Get all blocks in the file.""" return [self.get_block(entry.type) for entry in self.entries]
var calibrationData : Optional[CalibrationDataBlock]
-
Convenience property to get/set the calibration data block.
Expand source code
@property @provide_context_if_needed def calibrationData(self) -> Optional[CalibrationDataBlock]: """Convenience property to get/set the calibration data block.""" return self.get_block(CalibrationDataBlock.type)
var data3D : Optional[Data3D]
-
Convenience property to get/set the 3D data block.
Expand source code
@property @provide_context_if_needed def data3D(self) -> Optional[Data3D]: """ Convenience property to get/set the 3D data block. """ return self.get_block(BlockType.data3D)
var emg : Optional[EMG]
-
Convenience property to get/set the EMG data block.
Expand source code
@property @provide_context_if_needed def emg(self) -> Optional[EMG]: """Convenience property to get/set the EMG data block.""" return self.get_block(EMG.type)
var events : Optional[TemporalEventsData]
-
Convenience property to get/set the events data block.
Expand source code
@property @provide_context_if_needed def events(self) -> Optional[TemporalEventsData]: """Convenience property to get/set the events data block.""" return self.get_block(BlockType.temporalEventsData)
var force_and_torque : Optional[ForceTorque3D]
-
Convenience property to get/set the force and torque data block.
Expand source code
@property @provide_context_if_needed def force_and_torque(self) -> Optional[ForceTorque3D]: """Convenience property to get/set the force and torque data block.""" return self.get_block(BlockType.forceAndTorqueData)
var has_data3D : bool
-
Check if the file has a 3D data block.
Expand source code
@property @provide_context_if_needed def has_data3D(self) -> bool: """Check if the file has a 3D data block.""" return any(entry.type == BlockType.data3D for entry in self.entries)
var has_emg : bool
-
Check if the TDF file has an EMG block
Expand source code
@property @provide_context_if_needed def has_emg(self) -> bool: """Check if the TDF file has an EMG block""" return any( entry.type == BlockType.electromyographicData for entry in self.entries )
var has_events : bool
-
Check if the TDF file has an events block
Expand source code
@property @provide_context_if_needed def has_events(self) -> bool: """Check if the TDF file has an events block""" return any(i for i in self.entries if i.type == BlockType.temporalEventsData)
var has_force_and_torque : bool
-
Check if the file has a force and torque data block.
Expand source code
@property @provide_context_if_needed def has_force_and_torque(self) -> bool: """Check if the file has a force and torque data block.""" return any(entry.type == BlockType.forceAndTorqueData for entry in self.entries)
var nBytes : int
-
Return the size of the TDF file in bytes
Expand source code
@property def nBytes(self) -> int: """Return the size of the TDF file in bytes""" return self.file_path.stat().st_size
Methods
def add_block(self, newBlock: Block, comment: str = 'Generated by basicTDF') ‑> None
-
Adds a block to the TDF file
Args
newBlock
:Block
- the block to be added.
comment
:str
, optional- A description for the block entry.
Defaults to "Generated by basicTDF".
Raises
PermissionError
- the TDF is read only
ValueError
- there's already a block of the same type
ValueError
- block limit reached (14 as per BTS's implementation)
IOError
- unused empty blocks in the middle of the file
Expand source code
@raise_if_outside_write_context def add_block( self, newBlock: Block, comment: str = "Generated by basicTDF" ) -> None: """Adds a block to the TDF file Args: newBlock (Block): the block to be added. comment (str, optional): A description for the block entry. Defaults to "Generated by basicTDF". Raises: PermissionError: the TDF is read only ValueError: there's already a block of the same type ValueError: block limit reached (14 as per BTS's implementation) IOError: unused empty blocks in the middle of the file """ if self._mode == "rb": raise PermissionError( "Can't add blocks, this file was opened in read-only mode" ) try: if self.get_block(newBlock.type): raise ValueError( ( f"There's already a block of this type {newBlock.type}" " .Remove it first" ) ) except Exception: pass # find first unused slot try: unusedBlockPos = next( n for n, i in enumerate(self.entries) if i.type == BlockType.unusedSlot ) except StopIteration: raise ValueError(f"Block limit reached ({len(self.entries)})") # write new entry with the offset of that unused slot new_entry = TdfEntry( type=newBlock.type, format=newBlock.format.value, offset=self.entries[unusedBlockPos].offset, size=newBlock.nBytes, creation_date=newBlock.creation_date, last_modification_date=newBlock.last_modification_date, last_access_date=datetime.now(), comment=comment, ) # replace the entry self.entries[unusedBlockPos] = new_entry # write new entry self.handler.seek(64 + 288 * unusedBlockPos, 0) new_entry._write(self.handler) # update all unused slots's offset for n, entry in enumerate( self.entries[unusedBlockPos + 1 :], start=unusedBlockPos + 1 ): if entry.type == BlockType.unusedSlot: entry.offset = new_entry.offset + new_entry.size self.handler.seek(64 + 288 * n, 0) entry._write(self.handler) else: raise IOError("All unused slots must be at the end of the file") # write new block self.handler.seek(new_entry.offset, 0) newBlock._write(self.handler) # ensure the file is the correct size # and that the changes are written to disk self.handler.flush()
def allow_write(self) ‑> Tdf
-
Allow writing to the file.
Expand source code
def allow_write(self) -> "Tdf": """Allow writing to the file.""" self._mode = "r+b" return self
def get_block(self, index_or_type: Union[BlockType, int]) ‑> Optional[Type[Block]]
-
Get a block from the TDF file.
Expand source code
@provide_context_if_needed def get_block(self, index_or_type: Union[BlockType, int]) -> Optional[Type[Block]]: """Get a block from the TDF file.""" if isinstance(index_or_type, int): if 0 <= index_or_type < len(self.entries): entry = self.entries[index_or_type] else: raise IndexError(f"Index {index_or_type} out of range") elif isinstance(index_or_type, BlockType): entry = next((e for e in self.entries if e.type == index_or_type), None) if entry is None: raise Exception(f"Block {index_or_type} not found") else: raise TypeError(f"Expected int or BlockType, got {type(index_or_type)}") self.handler.seek(entry.offset, 0) block_class = _get_block_class(entry.type) return block_class._build(self.handler, entry.format)
def remove_block(self, type: Block) ‑> None
-
Remove a block of the given type from the file. Removing a block implies:
- Removing the entry
- Updating all subsequent unused slots's offset (subtracting the size of the removed block)
- Inserting a new unused slot entry at the end (with the previous slot offset + size as offset)
- If there is info after the block, move it block_to_remove.size up
Expand source code
@raise_if_outside_write_context def remove_block(self, type: Block) -> None: """Remove a block of the given type from the file. Removing a block implies: - Removing the entry - Updating all subsequent unused slots's offset (subtracting the size of the removed block) - Inserting a new unused slot entry at the end (with the previous slot offset + size as offset) - If there is info after the block, move it block_to_remove.size up """ if "+" not in self._mode: raise PermissionError( "Can't remove blocks, this file was opened in read-only mode" ) # find block try: oldEntryPos, oldEntry = next( (n, i) for n, i in enumerate(self.entries) if i.type == type ) except StopIteration: raise ValueError(f"No block of type {type} found") # calculate new offset for the next unused slot newOffset = ( self.entries[-1].offset if oldEntryPos != 0 else (64 + 288 * self.nEntries) ) # delete entry self.entries.remove(oldEntry) self.handler.seek(64 + 288 * oldEntryPos, 0) # update all the offsets of the entries preceding the removed one for entry in self.entries[oldEntryPos:]: entry.offset -= oldEntry.size entry._write(self.handler) # add new unused slot at the end date = datetime.now() newEntry = TdfEntry( type=BlockType.unusedSlot, format=0, offset=newOffset, size=0, creation_date=date, last_modification_date=date, last_access_date=date, comment="Generated by basicTDF", ) self.entries.append(newEntry) newEntry._write(self.handler) self.handler.seek(oldEntry.offset + oldEntry.size, 0) temp = self.handler.read() self.handler.seek(oldEntry.offset, 0) self.handler.write(temp) self.handler.truncate() self.handler.flush()
def replace_block(self, newBlock: Block, comment: Optional[str] = None) ‑> None
-
Replace a block of the same type with a new one. This is done by removing the old block and adding the new one.
Expand source code
@raise_if_outside_write_context def replace_block(self, newBlock: Block, comment: Optional[str] = None) -> None: """Replace a block of the same type with a new one. This is done by removing the old block and adding the new one.""" old_entry = next((i for i in self.entries if i.type == newBlock.type), None) if old_entry is None: raise ValueError(f"No block of type {newBlock.type} found") comment = comment if comment is not None else old_entry.comment self.remove_block(newBlock.type) self.add_block(newBlock, comment)
class TemporalEventsData (format=TemporalEventsDataFormat.standard, start_time=0.0)
-
The Events data block. Stores the temporal events data.
Expand source code
class TemporalEventsData(Block): """ The Events data block. Stores the temporal events data. """ type = BlockType.temporalEventsData def __init__(self, format=TemporalEventsDataFormat.standard, start_time=0.0): super().__init__() self.format = format self.start_time = start_time self.events = [] @staticmethod def _build(stream, format) -> "TemporalEventsData": format = TemporalEventsDataFormat(format) nEvents = i32.bread(stream) start_time = f32.bread(stream) t = TemporalEventsData(format, start_time) t.events = [Event._build(stream) for _ in range(nEvents)] return t def _write(self, stream) -> None: i32.bwrite(stream, len(self.events)) f32.bwrite(stream, self.start_time) for event in self.events: event._write(stream) @property def nBytes(self) -> int: return 4 + 4 + sum(i.nBytes for i in self.events) def __len__(self) -> int: return len(self.events) def __getitem__(self, item: Union[int, str]) -> Event: """ Returns the event with the given index or label. Args: item (Union[int, str]): index or label of the event Raises: KeyError: if the event with the given label is not found TypeError: if the key type is not int or str """ if isinstance(item, int): return self.events[item] elif isinstance(item, str): try: return next(e for e in self.events if e.label == item) except StopIteration: raise KeyError(f"Event with label {item} not found") raise TypeError(f"Invalid key type: {type(item)}") def __iter__(self) -> Iterator[Event]: return iter(self.events) def __contains__(self, value: Union[Event, str]) -> bool: if isinstance(value, Event): return value in self.events elif isinstance(value, str): return any(value == event.label for event in self.events) raise TypeError(f"Invalid key type: {type(value)}") def __eq__(self, other) -> bool: if not isinstance(other, TemporalEventsData): return False return ( self.format == other.format and self.start_time == other.start_time and all(e1 == e2 for e1, e2 in zip(self.events, other.events)) ) def __repr__(self) -> str: return ( "<TemporalEventsData " f"format={self.format.name} " f"nEvents={len(self.events)} " f"start_time={self.start_time} " f"events={self.events} " f"size={self.nBytes}>" )
Ancestors
- Block
- basictdf.tdfBlock.Sized
- basictdf.tdfBlock.BuildWriteable
- abc.ABC
Class variables
var type
Instance variables
var nBytes : int
-
Size in bytes
Expand source code
@property def nBytes(self) -> int: return 4 + 4 + sum(i.nBytes for i in self.events)