"""@package docstring B-ASIC Port Module. TODO: More info. """ from abc import ABC, abstractmethod from typing import NewType, Optional, List from b_asic.operation import Operation from b_asic.signal import Signal PortIndex = NewType("PortIndex", int) class Port(ABC): """Port Interface. TODO: More documentaiton? """ @property @abstractmethod def operation(self) -> Operation: """Return the connected operation.""" raise NotImplementedError @property @abstractmethod def index(self) -> PortIndex: """Return the unique PortIndex.""" raise NotImplementedError @property @abstractmethod def signals(self) -> List[Signal]: """Return a list of all connected signals.""" raise NotImplementedError @property @abstractmethod def connected_ports(self) -> List["Port"]: """Return a list of all connected Ports.""" raise NotImplementedError @abstractmethod def signal_count(self) -> int: """Return the number of connected signals.""" raise NotImplementedError @abstractmethod def connect(self, port: "Port") -> Signal: """Create and return a signal that is connected to this port and the entered port and connect this port to the signal and the entered port to the signal. """ raise NotImplementedError @abstractmethod def add_signal(self, signal: Signal) -> None: """Connect this port to the entered signal. If the entered signal isn't connected to this port then connect the entered signal to the port aswell. """ raise NotImplementedError @abstractmethod def disconnect(self, port: "Port") -> None: """Disconnect the entered port from the port by removing it from the ports signal. If the entered port is still connected to this ports signal then disconnect the entered port from the signal aswell. """ raise NotImplementedError @abstractmethod def remove_signal(self, signal: Signal) -> None: """Remove the signal that was entered from the Ports signals. If the entered signal still is connected to this port then disconnect the entered signal from the port aswell. Keyword arguments: - signal: Signal to remove. """ raise NotImplementedError @abstractmethod def clear(self) -> None: """Removes all connected signals from the Port.""" raise NotImplementedError class AbstractPort(Port): """Abstract port class. Handles functionality for port id and saves the connection to the parent operation. """ _index: int _operation: Operation def __init__(self, index: int, operation: Operation): self._index = index self._operation = operation @property def operation(self) -> Operation: return self._operation @property def index(self) -> PortIndex: return self._index class InputPort(AbstractPort): """Input port. TODO: More info. """ _source_signal: Optional[Signal] _value_length: Optional[int] def __init__(self, port_id: PortIndex, operation: Operation): super().__init__(port_id, operation) self._source_signal = None self._value_length = None @property def signals(self) -> List[Signal]: return [] if self._source_signal is None else [self._source_signal] @property def value_length(self) -> Optional[int]: return self._value_length @property def connected_ports(self) -> List[Port]: return [] if self._source_signal is None or self._source_signal.source is None \ else [self._source_signal.source] @value_length.setter def value_length(self, bits: Optional[int]) -> None: assert (isinstance(bits, int) and bits >= 0) or bits is None, \ "Value length on input port has to be a non-negative integer or None." self._value_length = bits def signal_count(self) -> int: return 0 if self._source_signal is None else 1 def connect(self, port: "OutputPort") -> Signal: assert self._source_signal is None, "Connecting new port to already connected input port." return Signal(port, self) # self._source_signal is set by the signal constructor. def add_signal(self, signal: Signal) -> None: assert self._source_signal is None, "Connecting new port to already connected input port." self._source_signal: Signal = signal if self is not signal.destination: # Connect this inputport as destination for this signal if it isn't already. signal.set_destination(self) def disconnect(self, port: "OutputPort") -> None: assert self._source_signal.source is port, "The entered port is not connected to this port." self._source_signal.remove_source() def remove_signal(self, signal: Signal) -> None: old_signal: Signal = self._source_signal self._source_signal = None if self is old_signal.destination: # Disconnect the dest of the signal if this inputport currently is the dest. old_signal.remove_destination() def clear(self) -> None: self.remove_signal(self._source_signal) class OutputPort(AbstractPort): """Output port. TODO: More info. """ _destination_signals: List[Signal] def __init__(self, port_id: PortIndex, operation: Operation): super().__init__(port_id, operation) self._destination_signals = [] @property def signals(self) -> List[Signal]: return self._destination_signals.copy() def signal(self, i: int = 0) -> Signal: assert 0 <= i < self.signal_count(), "Signal index out of bounds." return self._destination_signals[i] @property def connected_ports(self) -> List[Port]: return [signal.destination for signal in self._destination_signals \ if signal.destination is not None] def signal_count(self) -> int: return len(self._destination_signals) def connect(self, port: InputPort) -> Signal: return Signal(self, port) # Signal is added to self._destination_signals in signal constructor. def add_signal(self, signal: Signal) -> None: assert signal not in self.signals, \ "Attempting to connect to Signal already connected." self._destination_signals.append(signal) if self is not signal.source: # Connect this outputport to the signal if it isn't already. signal.set_source(self) def disconnect(self, port: InputPort) -> None: assert port in self.connected_ports, "Attempting to disconnect port that isn't connected." for sig in self._destination_signals: if sig.destination is port: sig.remove_destination() break def remove_signal(self, signal: Signal) -> None: i: int = self._destination_signals.index(signal) old_signal: Signal = self._destination_signals[i] del self._destination_signals[i] if self is old_signal.source: old_signal.remove_source() def clear(self) -> None: for signal in self._destination_signals: self.remove_signal(signal)