summaryrefslogtreecommitdiff
path: root/finance/flow.py
blob: c6b28c051d9347d298bbd6e4ca0d9f0c0e439369 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import dataclasses
from datetime import datetime
from decimal import Decimal
from typing import Generator


@dataclasses.dataclass(kw_only=True, frozen=True)
class Flow:
    """Time-discrete flow of money paid on the first day of a month"""

    amount: Decimal
    since: None | datetime
    until: None | datetime

    def integrate(self, start: datetime, end: datetime) -> Decimal:
        """Integrate the flow between two dates to an amount of money"""

        payments: int = 0

        if self.since is not None:
            if start < self.since:
                start = self.since

        if self.until is not None:
            if end > self.until:
                end = self.until

        for candidate in monthly_candidates(start):
            if start <= candidate <= end:
                payments += 1
            if candidate > end:
                break

        return self.amount * Decimal(payments)


def monthly_candidates(start: datetime) -> Generator[datetime, None, None]:
    current = datetime(start.year, start.month, 1)
    while True:
        yield current
        if current.month == 12:
            current = datetime(current.year + 1, 1, 1)
        else:
            current = datetime(current.year, current.month + 1, 1)


def simulate(
    start: datetime, end: datetime, flows: tuple[Flow, ...]
) -> list[tuple[datetime, Decimal]]:
    dates: list[datetime] = []
    values: list[Decimal] = []

    for candidate in monthly_candidates(start):
        if start <= candidate <= end:
            dates.append(candidate)

        if candidate > end:
            break

    for date in dates:
        value = Decimal(0.0)
        for flow in flows:
            value += flow.integrate(start, date)
        values.append(value)

    return [(date, values[index]) for index, date in enumerate(dates)]