Newer
Older
B-ASIC Port Module.
TODO: More info.
"""
from abc import ABC, abstractmethod
from copy import copy
from typing import NewType, Optional, List, Iterable, TYPE_CHECKING
Angus Lothian
committed
from b_asic.signal import Signal
if TYPE_CHECKING:
from b_asic.operation import Operation
Angus Lothian
committed
Angus Lothian
committed
"""Port Interface.
Angus Lothian
committed
Angus Lothian
committed
TODO: More documentaiton?
Angus Lothian
committed
@abstractmethod
Angus Lothian
committed
"""Return the connected operation."""
Angus Lothian
committed
raise NotImplementedError
@property
@abstractmethod
def index(self) -> int:
"""Return the index of the port."""
Angus Lothian
committed
raise NotImplementedError
@property
@abstractmethod
def signal_count(self) -> int:
Angus Lothian
committed
"""Return the number of connected signals."""
raise NotImplementedError
Angus Lothian
committed
@abstractmethod
def signals(self) -> Iterable[Signal]:
"""Return all connected signals."""
Angus Lothian
committed
def add_signal(self, signal: Signal) -> None:
Angus Lothian
committed
"""Connect this port to the entered signal. If the entered signal isn't connected to
this port then connect the entered signal to the port aswell.
"""
Angus Lothian
committed
@abstractmethod
Angus Lothian
committed
def remove_signal(self, signal: Signal) -> None:
"""Remove the signal that was entered from the Ports signals.
If the entered signal still is connected to this port then disconnect the
entered signal from the port aswell.
Keyword arguments:
- signal: Signal to remove.
"""
raise NotImplementedError
@abstractmethod
def clear(self) -> None:
"""Removes all connected signals from the Port."""
Angus Lothian
committed
class AbstractPort(Port):
"""Abstract port class.
Handles functionality for port id and saves the connection to the parent operation.
"""
Angus Lothian
committed
_index: int
def __init__(self, operation: "Operation", index: int):
Angus Lothian
committed
self._operation = operation
Angus Lothian
committed
@property
Angus Lothian
committed
return self._operation
@property
Angus Lothian
committed
return self._index
class SignalSourceProvider(ABC):
"""Signal source provider interface.
TODO: More info.
"""
@property
@abstractmethod
def source(self) -> "OutputPort":
"""Get the main source port provided by this object."""
raise NotImplementedError
Angus Lothian
committed
class InputPort(AbstractPort):
Angus Lothian
committed
Angus Lothian
committed
_source_signal: Optional[Signal]
def __init__(self, operation: "Operation", index: int):
super().__init__(operation, index)
Angus Lothian
committed
self._source_signal = None
Angus Lothian
committed
return 0 if self._source_signal is None else 1
@property
def signals(self) -> Iterable[Signal]:
return [] if self._source_signal is None else [self._source_signal]
Angus Lothian
committed
def add_signal(self, signal: Signal) -> None:
assert self._source_signal is None, "Input port may have only one signal added."
assert signal is not self._source_signal, "Attempted to add already connected signal."
self._source_signal = signal
signal.set_destination(self)
Angus Lothian
committed
def remove_signal(self, signal: Signal) -> None:
assert signal is self._source_signal, "Attempted to remove already removed signal."
Angus Lothian
committed
self._source_signal = None
Angus Lothian
committed
def clear(self) -> None:
if self._source_signal is not None:
self.remove_signal(self._source_signal)
@property
def connected_source(self) -> Optional["OutputPort"]:
"""Get the output port that is currently connected to this input port,
or None if it is unconnected.
"""
return None if self._source_signal is None else self._source_signal.source
def connect(self, src: SignalSourceProvider) -> Signal:
"""Connect the provided signal source to this input port by creating a new signal.
Returns the new signal.
"""
assert self._source_signal is None, "Attempted to connect already connected input port."
return Signal(src.source, self) # self._source_signal is set by the signal constructor.
@property
def value_length(self) -> Optional[int]:
"""Get the number of bits that this port should truncate received values to."""
return self._value_length
@value_length.setter
def value_length(self, bits: Optional[int]) -> None:
"""Set the number of bits that this port should truncate received values to."""
assert bits is None or (isinstance(bits, int) and bits >= 0), "Value length must be non-negative."
self._value_length = bits
class OutputPort(AbstractPort, SignalSourceProvider):
Angus Lothian
committed
_destination_signals: List[Signal]
def __init__(self, operation: "Operation", index: int):
super().__init__(operation, index)
Angus Lothian
committed
self._destination_signals = []
Angus Lothian
committed
def signal_count(self) -> int:
Angus Lothian
committed
return len(self._destination_signals)
Angus Lothian
committed
@property
def signals(self) -> Iterable[Signal]:
return self._destination_signals
Angus Lothian
committed
Angus Lothian
committed
def add_signal(self, signal: Signal) -> None:
assert signal not in self._destination_signals, "Attempted to add already connected signal."
Angus Lothian
committed
self._destination_signals.append(signal)
Angus Lothian
committed
def remove_signal(self, signal: Signal) -> None:
assert signal in self._destination_signals, "Attempted to remove already removed signal."
self._destination_signals.remove(signal)
signal.remove_source()
Angus Lothian
committed
Angus Lothian
committed
def clear(self) -> None:
Angus Lothian
committed
self.remove_signal(signal)
@property
def source(self) -> "OutputPort":
return self