Skip to content
Snippets Groups Projects
port.py 7.19 KiB
Newer Older
  • Learn to ignore specific revisions
  • """@package docstring
    
    B-ASIC Port Module.
    TODO: More info.
    """
    
    from abc import ABC, abstractmethod
    
    from typing import NewType, Optional, List
    
    
    Kevin Scott's avatar
    Kevin Scott committed
    from b_asic.operation import Operation
    
    class Port(ABC):
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
        """
    
        @property
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
        def operation(self) -> Operation:
    
            raise NotImplementedError
    
        @property
        @abstractmethod
        def index(self) -> PortIndex:
            """Return the unique PortIndex."""
            raise NotImplementedError
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
    
        @property
        @abstractmethod
        def signals(self) -> List[Signal]:
    
            """Return a list of all connected signals."""
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
            raise NotImplementedError
    
    
        @property
        @abstractmethod
        def connected_ports(self) -> List["Port"]:
            """Return a list of all connected Ports."""
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
            raise NotImplementedError
    
        @abstractmethod
        def signal_count(self) -> int:
    
            """Return the number of connected signals."""
            raise NotImplementedError
    
        @abstractmethod
    
        def connect(self, port: "Port") -> Signal:
    
            """Create and return a signal that is connected to this port and the entered
    
            port and connect this port to the signal and the entered port to the signal.
            """
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
            raise NotImplementedError
    
        @abstractmethod
    
        def add_signal(self, signal: Signal) -> None:
    
            """Connect this port to the entered signal. If the entered signal isn't connected to
    
            this port then connect the entered signal to the port aswell.
            """
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
            raise NotImplementedError
    
        @abstractmethod
    
        def disconnect(self, port: "Port") -> None:
            """Disconnect the entered port from the port by removing it from the ports signal.
            If the entered port is still connected to this ports signal then disconnect the entered
    
            port from the signal aswell.
            """
    
        def remove_signal(self, signal: Signal) -> None:
            """Remove the signal that was entered from the Ports signals.
            If the entered signal still is connected to this port then disconnect the
            entered signal from the port aswell.
    
            Keyword arguments:
            - signal: Signal to remove.
            """
            raise NotImplementedError
    
        @abstractmethod
        def clear(self) -> None:
            """Removes all connected signals from the Port."""
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
            raise NotImplementedError
    
    class AbstractPort(Port):
        """Abstract port class.
    
        Handles functionality for port id and saves the connection to the parent operation.
        """
    
        _index: int
        _operation: Operation
    
        def __init__(self, index: int, operation: Operation):
            self._index = index
            self._operation = operation
    
        @property
        def operation(self) -> Operation:
            return self._operation
    
        @property
        def index(self) -> PortIndex:
            return self._index
    
    
    class InputPort(AbstractPort):
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
        """Input port.
        TODO: More info.
        """
    
        _value_length: Optional[int]
    
        def __init__(self, port_id: PortIndex, operation: Operation):
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
            super().__init__(port_id, operation)
    
            self._value_length = None
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
        @property
        def signals(self) -> List[Signal]:
    
            return [] if self._source_signal is None else [self._source_signal]
    
        @property
        def value_length(self) -> Optional[int]:
            return self._value_length
    
    
        @property
        def connected_ports(self) -> List[Port]:
    
            return [] if self._source_signal is None or self._source_signal.source is None \
                else [self._source_signal.source]
    
        @value_length.setter
        def value_length(self, bits: Optional[int]) -> None:
            assert (isinstance(bits, int) and bits >= 0) or bits is None, \
                "Value length on input port has to be a non-negative integer or None."
            self._value_length = bits
    
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
        def signal_count(self) -> int:
    
            return 0 if self._source_signal is None else 1
    
        def connect(self, port: "OutputPort") -> Signal:
            assert self._source_signal is None, "Connecting new port to already connected input port."
    
            return Signal(port, self)        # self._source_signal is set by the signal constructor.
    
        def add_signal(self, signal: Signal) -> None:
            assert self._source_signal is None, "Connecting new port to already connected input port."
            self._source_signal: Signal = signal
    
            if self is not signal.destination:
                # Connect this inputport as destination for this signal if it isn't already.
    
        def disconnect(self, port: "OutputPort") -> None:
            assert self._source_signal.source is port, "The entered port is not connected to this port."
            self._source_signal.remove_source()
    
        def remove_signal(self, signal: Signal) -> None:
            old_signal: Signal = self._source_signal
            self._source_signal = None
    
                # Disconnect the dest of the signal if this inputport currently is the dest.
    
        def clear(self) -> None:
            self.remove_signal(self._source_signal)
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
        """Output port.
        TODO: More info.
        """
    
        def __init__(self, port_id: PortIndex, operation: Operation):
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
            super().__init__(port_id, operation)
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
        @property
        def signals(self) -> List[Signal]:
    
    Jacob Wahlman's avatar
    Jacob Wahlman committed
        def signal(self, i: int = 0) -> Signal:
    
            assert 0 <= i < self.signal_count(), "Signal index out of bounds."
    
        @property
        def connected_ports(self) -> List[Port]:
    
            return [signal.destination for signal in self._destination_signals \
    
        def connect(self, port: InputPort) -> Signal:
    
            return Signal(self, port)  # Signal is added to self._destination_signals in signal constructor.
    
        def add_signal(self, signal: Signal) -> None:
            assert signal not in self.signals, \
    
                    "Attempting to connect to Signal already connected."
    
            self._destination_signals.append(signal)
    
                # Connect this outputport to the signal if it isn't already.
    
                signal.set_source(self)
    
        def disconnect(self, port: InputPort) -> None:
            assert port in self.connected_ports, "Attempting to disconnect port that isn't connected."
            for sig in self._destination_signals:
                if sig.destination is port:
                    sig.remove_destination()
                    break
    
        def remove_signal(self, signal: Signal) -> None:
            i: int = self._destination_signals.index(signal)
            old_signal: Signal = self._destination_signals[i]
            del self._destination_signals[i]
    
        def clear(self) -> None:
            for signal in self._destination_signals:
                self.remove_signal(signal)