Newer
Older
B-ASIC Port Module.
TODO: More info.
"""
from abc import ABC, abstractmethod
from typing import NewType, Optional, List
Angus Lothian
committed
from b_asic.signal import Signal
Angus Lothian
committed
PortIndex = NewType("PortIndex", int)
Angus Lothian
committed
"""Port Interface.
Angus Lothian
committed
Angus Lothian
committed
TODO: More documentaiton?
Angus Lothian
committed
@abstractmethod
Angus Lothian
committed
"""Return the connected operation."""
Angus Lothian
committed
raise NotImplementedError
@property
@abstractmethod
def index(self) -> PortIndex:
"""Return the unique PortIndex."""
raise NotImplementedError
@property
@abstractmethod
def signals(self) -> List[Signal]:
Angus Lothian
committed
"""Return a list of all connected signals."""
Angus Lothian
committed
@property
@abstractmethod
def connected_ports(self) -> List["Port"]:
"""Return a list of all connected Ports."""
raise NotImplementedError
@abstractmethod
def signal_count(self) -> int:
Angus Lothian
committed
"""Return the number of connected signals."""
raise NotImplementedError
@abstractmethod
Angus Lothian
committed
def connect(self, port: "Port") -> Signal:
Angus Lothian
committed
"""Create and return a signal that is connected to this port and the entered
port and connect this port to the signal and the entered port to the signal.
"""
Angus Lothian
committed
def add_signal(self, signal: Signal) -> None:
Angus Lothian
committed
"""Connect this port to the entered signal. If the entered signal isn't connected to
this port then connect the entered signal to the port aswell.
"""
Angus Lothian
committed
def disconnect(self, port: "Port") -> None:
"""Disconnect the entered port from the port by removing it from the ports signal.
If the entered port is still connected to this ports signal then disconnect the entered
Angus Lothian
committed
raise NotImplementedError
@abstractmethod
Angus Lothian
committed
def remove_signal(self, signal: Signal) -> None:
"""Remove the signal that was entered from the Ports signals.
If the entered signal still is connected to this port then disconnect the
entered signal from the port aswell.
Keyword arguments:
- signal: Signal to remove.
"""
raise NotImplementedError
@abstractmethod
def clear(self) -> None:
"""Removes all connected signals from the Port."""
Angus Lothian
committed
class AbstractPort(Port):
"""Abstract port class.
Handles functionality for port id and saves the connection to the parent operation.
"""
_index: int
_operation: Operation
def __init__(self, index: int, operation: Operation):
self._index = index
self._operation = operation
@property
def operation(self) -> Operation:
return self._operation
@property
def index(self) -> PortIndex:
return self._index
class InputPort(AbstractPort):
Angus Lothian
committed
Angus Lothian
committed
_source_signal: Optional[Signal]
Angus Lothian
committed
def __init__(self, port_id: PortIndex, operation: Operation):
Angus Lothian
committed
self._source_signal = None
Angus Lothian
committed
return [] if self._source_signal is None else [self._source_signal]
@property
def value_length(self) -> Optional[int]:
return self._value_length
Angus Lothian
committed
@property
def connected_ports(self) -> List[Port]:
Angus Lothian
committed
return [] if self._source_signal is None or self._source_signal.source is None \
else [self._source_signal.source]
@value_length.setter
def value_length(self, bits: Optional[int]) -> None:
assert (isinstance(bits, int) and bits >= 0) or bits is None, \
"Value length on input port has to be a non-negative integer or None."
self._value_length = bits
Angus Lothian
committed
return 0 if self._source_signal is None else 1
Angus Lothian
committed
def connect(self, port: "OutputPort") -> Signal:
assert self._source_signal is None, "Connecting new port to already connected input port."
return Signal(port, self) # self._source_signal is set by the signal constructor.
Angus Lothian
committed
def add_signal(self, signal: Signal) -> None:
assert self._source_signal is None, "Connecting new port to already connected input port."
self._source_signal: Signal = signal
Angus Lothian
committed
if self is not signal.destination:
# Connect this inputport as destination for this signal if it isn't already.
Angus Lothian
committed
signal.set_destination(self)
Angus Lothian
committed
def disconnect(self, port: "OutputPort") -> None:
assert self._source_signal.source is port, "The entered port is not connected to this port."
self._source_signal.remove_source()
def remove_signal(self, signal: Signal) -> None:
old_signal: Signal = self._source_signal
self._source_signal = None
Angus Lothian
committed
if self is old_signal.destination:
# Disconnect the dest of the signal if this inputport currently is the dest.
Angus Lothian
committed
old_signal.remove_destination()
Angus Lothian
committed
def clear(self) -> None:
self.remove_signal(self._source_signal)
Angus Lothian
committed
class OutputPort(AbstractPort):
Angus Lothian
committed
_destination_signals: List[Signal]
Angus Lothian
committed
def __init__(self, port_id: PortIndex, operation: Operation):
Angus Lothian
committed
self._destination_signals = []
Angus Lothian
committed
return self._destination_signals.copy()
Angus Lothian
committed
assert 0 <= i < self.signal_count(), "Signal index out of bounds."
Angus Lothian
committed
return self._destination_signals[i]
Angus Lothian
committed
@property
def connected_ports(self) -> List[Port]:
Angus Lothian
committed
return [signal.destination for signal in self._destination_signals \
Angus Lothian
committed
if signal.destination is not None]
Angus Lothian
committed
def signal_count(self) -> int:
Angus Lothian
committed
return len(self._destination_signals)
Angus Lothian
committed
Angus Lothian
committed
def connect(self, port: InputPort) -> Signal:
return Signal(self, port) # Signal is added to self._destination_signals in signal constructor.
Angus Lothian
committed
Angus Lothian
committed
def add_signal(self, signal: Signal) -> None:
assert signal not in self.signals, \
Angus Lothian
committed
"Attempting to connect to Signal already connected."
Angus Lothian
committed
self._destination_signals.append(signal)
Angus Lothian
committed
if self is not signal.source:
# Connect this outputport to the signal if it isn't already.
Angus Lothian
committed
signal.set_source(self)
def disconnect(self, port: InputPort) -> None:
assert port in self.connected_ports, "Attempting to disconnect port that isn't connected."
for sig in self._destination_signals:
if sig.destination is port:
sig.remove_destination()
break
def remove_signal(self, signal: Signal) -> None:
i: int = self._destination_signals.index(signal)
old_signal: Signal = self._destination_signals[i]
del self._destination_signals[i]
Angus Lothian
committed
if self is old_signal.source:
Angus Lothian
committed
old_signal.remove_source()
Angus Lothian
committed
Angus Lothian
committed
def clear(self) -> None:
for signal in self._destination_signals:
self.remove_signal(signal)