0

I am trying to figure out if it is possible to configure a specific problem as a mixed-integer program. I think I am able to structure it as a continuous non-linear optimization problem, but would like to see if a MIP works better.

The basic idea is that there are several systems comprised of multiple elements. Over time, the elements degrade and certain actions/decisions can be applied at periodic intervals. Each decision choice has a different cost and impact on element condition. The goal is to define the optimal set of decisions for each element at periodic intervals. However, the decisions must not allow an overall system to degrade below a minimally acceptable level, or to cost more than a budgeted amount for each period.

Below is an example of how an example could be set up and how a single decision matrix would be evaluated:

# 1. define parameters like # of systes, decision options, decision periods, types etc.
# 2. define constraints
# 3. model how the elements degrade over time, and costs for decisions
# 4. model how elements will impact system metrics
# 5. structure as MIP <- HOW TO DO THIS??:

import random
from random import choices
import pandas as pd

# define parameters
#
decision_options = list(range(0,4)) # defines number of decision options
periods = list(range(0,6)) # defines number of decision periods
system_types = ['A', 'B', 'C'] # type used for condition constraints

# say we have 100 elements, each assigned to a system
element_ids = list(range(0, 100))


# create initialized element condition values
condition = choices(range(60,101), k=len(element_ids)) 
# assign types for each item
system_type = choices(system_types, k=len(element_ids))
# assign value for each group
value = random.sample(range(500, 20000), len(element_ids)) 

# create a dataframe with element, system type, condition, and element value information
df = pd.DataFrame({'Element_ID': element_ids, 'System_Type': system_type, 'Condition': condition, "Value": value})
df

# 2. define constraints
#
# create a dict where each type has a minimum allowable condition
vals= [60, 50, 40] # System = A has a limit of 60, System B has a limit of 50...
min_condition = dict(zip(system_types, vals))

# budget constraint for each period
max_budget = [200000] * len(periods)

# 3. model costs and degradation over time

# create a function that sets a element cost based on decision
def decision_cost(decision, value):
    cost = 0
    match decision:
        case 0:
            cost = 0 # do nothing
        case 1:
            cost = value / 10 # do a little
        case 2:
            cost = value / 5 # do a little more
        case 3:
            cost = value / 2 # do a lot
    return(cost)

# create a function that sets a element condition based on decision
def decision_result(decision, condition):
    
    match decision:
        case 0:
            condition = condition # no improvement
        case 1:
            condition = min(condition*1.1, 100) # a little improvement
        case 2:
            condition = min(condition*1.2, 100) # a little more improvement
        case 3:
            condition = min(condition*1.5, 100) # a lot of improvement
    return(condition)

# model element degradation
# element loses 10% at each period
def degrade_element(condition):
    new_condition = round(condition  *0.9,0)
    return(new_condition)

# 4. model how elements will impact system metrics
# these are to be compared to the min_condition constraints
#
# system condition is the weighted-and-summed condition of the constituent elements
def system_condition(df):
    system_types = sorted(df['System_Type'].unique())
    system_condition = [0] * len(system_types)
    
    for i in range(len(system_types)):
        system_data = df[df['System_Type'] == system_types[i]]
        system_condition[i] = (system_data['Condition'] * system_data['Value']).sum() / system_data['Value'].sum()
        
    return(system_condition)

def period_costs(new_df, periods):
    column_names = [f'Period_{p}' for p in periods]
    period_sums = []
    for col in column_names:
        period_sums.append(new_df[col].sum())
    return(period_sums)
    
system_condition(df)

# create a sample decision matrix:
# row = element
# column = period
# cell value = decision

import numpy as np

# randomly initialize a decision matrix
decision_matrix = np.random.randint(0, len(decision_options), size=(len(element_ids), max(periods)+1))

# example evaluation of a decision matrix

# get cost and result dataframes for system/element results
system_cost_df = system_result_df = pd.DataFrame(index=range(len(system_types)), columns=range(0, len(periods)))
element_cost_df = element_result_df = pd.DataFrame(index=range(len(element_ids)), columns=range(0, len(periods)))

def evaluate_decision_matrix(df, decision_matrix, periods):
    new_df = df
    #new_df['Cost'] = 0
    # create a column to collect the cost for each period
    column_names = [f'Period_{p}' for p in periods]
    new_df = new_df.assign(**{col: 0 for col in column_names})
    # for each period
    for i in range(0, len(periods)):
        # for each element
        for j in range(0, len(element_ids)):
            # execute decision
            decision = decision_matrix[j,i]
            element_value = new_df.iloc[j]['Value']
            #element_cost_df.loc[j,i] = decision_cost(decision, element_value)
            #cost = decision_cost(decision, element_value)
            new_df.loc[j, column_names[i]] = decision_cost(decision, element_value)
            # impact condition with decision 
            current_condition = new_df.iloc[j]['Condition']
            updated_condition = decision_result(decision, current_condition)
            new_df.loc[j, 'Condition'] = updated_condition
            # degrade element for next period
            degraded_condition = degrade_element(new_df.iloc[j]['Condition'])
            new_df.loc[j, 'Condition'] = degraded_condition
    return(new_df)

# evaluate decision matrix 
new_df = evaluate_decision_matrix(df, decision_matrix, periods)

# calculate system condition
system_condition_values = system_condition(new_df)

# check if system condition constraints are met
# returns true if system condition constraints are met
condition_check = [x > y for x, y in zip(system_condition_values, vals)]
print(condition_check)

# check if budget constaint is met
costs = period_costs(new_df, periods)
budget_check = [x < y for x, y in zip(costs, max_budget)]
print(budget_check)

As an example, the print(condition_check) command displays [True, True, True] indicating the system condition constraints are satisfied. But print(budget_check) shows [True, False, False, False, True, False] indicated 4 of the period budget constraints have been broken.

Is it possible to structure this as an MIP in python and, if so, how would that be done?

6
  • 1
    Well of course you could create a MIP and there are already freely-available solvers out there. The question is too broad for SO - you're asking for a ground-up solution to your entire optimisation problem. Furthermore, your existing code appears to already produce solutions that break the budget constraint; are you sure that your current code is doing what you intend before you move on (or, ask others to) to something else? Commented Apr 22 at 16:57
  • @roganjosh the code above is not producing a solution (the decision matrix is just randomly generated). The code above is meant to demonstrate how a decision matrix (generated during the MIP process) would be evaluated Commented Apr 22 at 17:01
  • What is an "element"? A stock price? Battery energy? Pickle brine? Commented Apr 24 at 0:49
  • The "condition" (another very poorly-chosen name) varies non-linearly period to period; it's multiplied by anywhere from 1 to 1.5 depending on the decision made, and this multiplication is period-cumulative. This means your problem automatically goes into MINLP - mixed integer non-linear programming - for which solvers are more difficult-to-find and much slower than linear solvers. Commented Apr 24 at 2:24
  • @Reinderien "elements" are the constituent parts of a "system". Meaning a system is comprised of elements. Each element has a condition associated with it that measures the relative health of the element on a continuous scale. Condition is the terminology used in the domain. The term seemed (perhaps wrongly) to be common enough to not be jargon. Thank you for the MINLP insight. Commented Apr 25 at 12:54

1 Answer 1

0

Your code shows a very confused idea of how optimisation works. As I mentioned in the comments, this problem is very much non-linear. I suggest that you first tackle it as a continuous problem, defining proper bounds, constraints and Jacobians.

Your use of min() will be fairly disastrous under most optimisation circumstances. Instead I suggest that you

  • Define two variable vectors: decisions, and conditions;
  • Bound the conditions to 100 without the use of min();
  • Define the budget constraint as being based on a continuous polynomial function of the decision variable;
  • Define the system constraint as being linear, having zero Hessian;
  • Maximize the condition of the last period, which will also be linear and have zero Hessian.
import time
import types
import typing

import numpy as np
import pandas as pd
import scipy.sparse as sp
from scipy.optimize import (
    approx_fprime, minimize,
    Bounds, LinearConstraint, NonlinearConstraint, OptimizeResult,
)


DECAY = 0.9


class Problem(typing.NamedTuple):
    """Problem definition, input data and supporting data"""
    n_types: int
    n_elements: int
    n_periods: int
    n_decisions: int

    systems: pd.DataFrame
    elements: pd.DataFrame
    periods: pd.DataFrame
    decisions: pd.DataFrame

    cost_poly: np.polynomial.Polynomial
    cost_gradient: np.polynomial.Polynomial
    improvement_poly: np.polynomial.Polynomial
    improvement_gradient: np.polynomial.Polynomial

    initial_condition_col: np.ndarray
    budget_indices: np.ndarray
    budget_indptr: np.ndarray
    fixed_condition_jac: np.ndarray
    fixed_condition_hess: sp.sparray
    fixed_system_jac: sp.sparray

    @property
    def system_types(self) -> pd.Index: return self.systems.index
    @property
    def element_ids(self) -> pd.Index: return self.elements.index
    @property
    def period_ids(self) -> pd.Index: return self.periods.index
    @property
    def decision_ids(self) -> pd.Index: return self.decisions.index

    @classmethod
    def build(
        cls,
        systems: pd.DataFrame,
        elements: pd.DataFrame,
        periods: pd.DataFrame,
        decisions: pd.DataFrame,
    ) -> typing.Self:
        n_decisions = len(decisions)
        n_periods = len(periods)
        n_types = len(systems)
        n_elements = len(elements)

        # simple, exact polynomial fit of the cost and improvement series for the continuous case
        cost_poly = np.polynomial.Polynomial.fit(
            decisions.index, decisions['cost'], deg=n_decisions - 1, symbol='d',
        )
        improvement_poly = np.polynomial.Polynomial.fit(
            decisions.index, decisions['improvement'], deg=n_decisions - 1, symbol='d',
        )

        fixed_condition_jac = sp.dok_matrix((1, 2*n_elements*n_periods))
        fixed_condition_jac[0, (n_elements + 1)*n_periods - 1:: n_periods] = -1
        # Though this is sparse, minimize() doesn't work unless it's returned as dense.
        fixed_condition_jac = fixed_condition_jac.toarray().squeeze()

        # Linear, fixed Jacobian, zero Hessian
        m = n_elements
        n = n_periods
        fixed_condition_hess = sp.csr_array((2*m*n, 2*m*n))

        initial_condition_col = elements['initial_condition'].values[:, np.newaxis]

        # sparse supporting data for budget constraint
        budget_indices = (
            np.arange(0, n, dtype=np.int32)[:, np.newaxis] +
            np.arange(0, m*n, n, dtype=np.int32)[np.newaxis, :]
        ).ravel()
        budget_indptr = np.arange(0, 1 + m*n, m, dtype=np.int32)

        # One-hot (n_types,n_elements) boolean array
        type_groups = (
            pd.get_dummies(elements['system_type'], sparse=True)
            .sparse.to_coo()
            .T
            .astype(np.float64)
        )

        # For each type row and element column, the corresponding value or 0 if not that type
        value_groups = type_groups.multiply(elements['value'].values)

        last_condition = np.zeros((1, n_periods))
        last_condition[:, -1] = 1
        fixed_system_jac = sp.hstack(
            (
                sp.csr_array((n_types, m*n)),  # Zero: system constraint is not affected by decisions
                sp.kron(
                    value_groups/value_groups.sum(axis=1),  # Normalize to the value group sum for each type
                    last_condition,  # Apply to the last condition column only
                ),
            ),
            format='csr',
        )

        fixed_condition_jac.flags.writeable = False
        initial_condition_col.flags.writeable = False
        budget_indices.flags.writeable = False
        budget_indptr.flags.writeable = False

        return cls(
            systems=systems, elements=elements, periods=periods, decisions=decisions,
            n_types=n_types, n_elements=n_elements, n_periods=n_periods, n_decisions=n_decisions,
            cost_poly=cost_poly, cost_gradient=cost_poly.deriv(),
            improvement_poly=improvement_poly, improvement_gradient=improvement_poly.deriv(),
            fixed_condition_jac=fixed_condition_jac, fixed_condition_hess=fixed_condition_hess,
            fixed_system_jac=fixed_system_jac, initial_condition_col=initial_condition_col,
            budget_indices=budget_indices, budget_indptr=budget_indptr,
        )

    @classmethod
    def example(
        cls,
        n_decisions: int = 4,
        n_periods: int = 6,
        n_elements: int = 100,
        rand: np.random.Generator | types.ModuleType = np.random,
    ) -> typing.Self:
        system_types = pd.Index(name='type', data=list('ABC'))
        systems = pd.DataFrame(
            index=system_types,
            data={'min_condition': [60, 50, 40]},
        )

        elements = pd.DataFrame(
            index=pd.RangeIndex(name='element', start=0, stop=n_elements),
            data={
                'system_type': rand.choice(system_types, shuffle=False, size=n_elements),
                'value': rand.integers(low=500, high=20_000, endpoint=False, size=n_elements),
                'initial_condition': rand.integers(low=60, high=100, endpoint=True, size=n_elements),
            },
        )

        periods = pd.DataFrame(
            index=pd.RangeIndex(name='period', start=0, stop=n_periods - 1),
            data={'max_budget': 200_000},
        )

        decisions = pd.DataFrame(
            index=pd.RangeIndex(name='decision', start=0, stop=n_decisions),
            data={
                'cost': [0, 0.1, 0.2, 0.5],  # coefficients on value
                'improvement': [1.0, 1.1, 1.2, 1.5],  # coefficients on condition. aka "result".
            },
        )

        return cls.build(
            systems=systems, elements=elements, periods=periods, decisions=decisions,
        )

    def vars_to_decisions(self, variables: np.ndarray) -> np.ndarray:
        """
        For all objective, constraint, and Jacobian functions: split optimization variables to
        (decision,condition)*n_elements*n_periods; the first dimension will always be unpacked
        """
        return variables.reshape((2, self.n_elements, self.n_periods))

    def condition_objective_continuous(self, variables: np.ndarray) -> float:
        """Optimise for the end-state condition (after last decision) only"""
        decision, condition = self.vars_to_decisions(variables)
        return -condition[:, -1].sum()   # negative to maximize condition

    def condition_jacobian(self, variables: np.ndarray) -> np.ndarray:
        return self.fixed_condition_jac

    def condition_hessian(self, variables: np.ndarray) -> sp.sparray:
        return self.fixed_condition_hess

    def budget_constraint(self, variables: np.ndarray) -> np.ndarray:
        """Cost subtotals for each period"""
        decision, condition = self.vars_to_decisions(variables)
        return self.elements['value'] @ self.cost_poly(decision)

    def budget_jacobian(self, variables: np.ndarray) -> sp.sparray:
        decision, condition = self.vars_to_decisions(variables)
        grad = self.cost_gradient(decision)
        m, n = grad.shape
        return sp.csr_array(
            (
                (grad.T * self.elements['value'].values).ravel(),  # data
                self.budget_indices, self.budget_indptr,
            ),
            shape=(n, 2*m*n),
        )

    def improvement_constraint(self, variables: np.ndarray) -> np.ndarray:
        """cond_{i+1} <= cond_i * DECAY * improvement_poly(decision)"""
        decision, condition = self.vars_to_decisions(variables)
        prev_condition = np.hstack((self.initial_condition_col, condition[:, :-1]))
        improvement = self.improvement_poly(decision)
        return (DECAY*improvement*prev_condition - condition).ravel()

    def improvement_jacobian(self, variables: np.ndarray) -> sp.sparray:
        decision, condition = self.vars_to_decisions(variables)
        prev_condition = np.hstack((self.initial_condition_col, condition[:, :-1]))
        lhs = sp.diags_array(
            DECAY*(
                prev_condition*self.improvement_gradient(decision)
            ).ravel(),
        )
        improvement = np.zeros((self.n_elements, self.n_periods))
        improvement[:, 1:] = DECAY*self.improvement_poly(decision[:, 1:])
        rhs = sp.diags_array(
            (
                np.full(shape=self.n_elements*self.n_periods, fill_value=-1),
                improvement.ravel()[1:],
            ),
            offsets=(0, -1),
        )
        return sp.hstack((lhs, rhs), format='csr')

    def test_jacobian(
        self, rand: np.random.Generator | types.ModuleType = np.random,
    ) -> None:
        """
        Doesn't help with solution. Only written to confirm that the Jacobian definitions are
        probably correct.
        """

        x0 = rand.integers(
            low=[[[self.decision_ids[0]]], [[60]]],
            high=[[[self.decision_ids[-1]]], [[100]]],
            endpoint=True,
            size=(2, self.n_elements, self.n_periods),
        ).ravel()

        # check_grad has a bug where the **2 is taken as a matrix power in the sparse case, so don't use it
        epsilon = np.sqrt(np.finfo(float).eps)

        approximate = approx_fprime(xk=x0, f=self.condition_objective_continuous, epsilon=epsilon)
        analytic = self.condition_jacobian(x0)
        error = analytic - approximate
        assert error.min() > -1e-12
        assert error.max() < +1e-12

        approximate = approx_fprime(xk=x0, f=self.budget_constraint, epsilon=epsilon)
        analytic = self.budget_jacobian(x0)
        error = analytic - approximate
        assert error.min() > -1e-2
        assert error.max() < +1e-2

        approximate = approx_fprime(xk=x0, f=self.improvement_constraint, epsilon=epsilon)
        analytic = self.improvement_jacobian(x0)
        error = analytic - approximate
        assert error.min() > -1e-5
        assert error.max() < +1e-5

    def solve_continuous(
        self, method: typing.Literal['slsqp', 'trust-constr'] = 'trust-constr',
    ) -> 'Solution':
        """
        Continuous form
        Maximize final-state improvement
        s.t. cost <= budget, condition >= min_condition, condition <= 100
        Variables:
        - continuous decisions for all elements and all non-initial periods
        - continuous conditions for all elements and all non-initial periods
        """

        # as a default, maintain all conditions at their initial value
        zeros = np.zeros(shape=self.n_elements*self.n_periods)
        x0 = np.concat((
            np.full_like(zeros, fill_value=self.decision_ids[0]),
            np.repeat(self.elements['initial_condition'].values, repeats=self.n_periods),
        ))

        budget_constraint = NonlinearConstraint(
            fun=self.budget_constraint,
            jac=self.budget_jacobian,
            lb=-np.inf, ub=self.periods['max_budget'].values,
        )
        improvement_constraint = NonlinearConstraint(
            fun=self.improvement_constraint,
            jac=self.improvement_jacobian,
            lb=0, ub=np.inf,
        )
        system_constraint = LinearConstraint(
            A=self.fixed_system_jac,
            lb=self.systems['min_condition'].values, ub=np.inf,
        )

        bounds = Bounds(
            lb=np.concat((
                np.full_like(zeros, fill_value=self.decision_ids[0]),
                zeros,
            )),
            ub=np.concat((
                np.full_like(zeros, fill_value=self.decision_ids[-1]),
                np.full_like(zeros, fill_value=100),
            )),
        )

        start = time.perf_counter()
        result = minimize(
            method=method,
            fun=self.condition_objective_continuous,
            jac=self.condition_jacobian,
            hess=self.condition_hessian,
            x0=x0,
            bounds=bounds,
            constraints=(budget_constraint, improvement_constraint, system_constraint),
        )
        stop = time.perf_counter()

        if not result.success:
            raise ValueError(result.message)

        return Solution.build(
            problem=self, result=result, dur=stop - start,
        )


class Solution(typing.NamedTuple):
    result: OptimizeResult
    dur: float
    decisions: pd.DataFrame
    periods: pd.DataFrame
    systems: pd.DataFrame
    conditions: pd.DataFrame

    @classmethod
    def build(
        cls,
        problem: Problem,
        result: OptimizeResult,
        dur: float,
    ) -> typing.Self:
        decision_array, condition_array = problem.vars_to_decisions(result.x)
        decisions = pd.DataFrame(
            index=problem.element_ids,
            columns=problem.period_ids,
            data=decision_array,
        )
        periods = problem.periods.assign(cost=problem.budget_constraint(result.x))
        systems = problem.systems.assign(condition=problem.fixed_system_jac @ result.x)
        conditions = pd.concat(
            (
                problem.elements['initial_condition'].rename(problem.period_ids[0] - 1),
                pd.DataFrame(
                    index=problem.element_ids,
                    columns=problem.period_ids,
                    data=condition_array,
                ),
            ),
            axis=1,
        )
        conditions.columns.name = 'period'

        return cls(
            result=result, dur=dur,
            decisions=decisions, periods=periods, systems=systems, conditions=conditions,
        )

    def describe(self) -> str:
        return (
            f'condition score={-self.result.fun:.2f} '
            f'in {self.result.nit} iterations, {self.dur:.2f} s: '
            f'{self.result.message}'
            f'\n'
            f'\nPeriod costs:'
            f'\n{self.periods}'
            f'\n'
            f'\nSystem condition groups in final period:'
            f'\n{self.systems}'
        )
        # The entire decision and condition tables are not printed here


def main() -> None:
    print('Setting up...')
    rand = np.random.default_rng(seed=0)
    problem = Problem.example(rand=rand)
    problem.test_jacobian(rand=rand)
    print('cost = ', problem.cost_poly)
    print('improvement =', problem.improvement_poly)
    print()

    print('Solving...')
    solution = problem.solve_continuous()
    print(solution.describe())


if __name__ == '__main__':
    main()
Setting up...
cost =  0.1375 + 0.1375 (-1.0 + 0.66666667d) + 0.1125 (-1.0 + 0.66666667d)**2 +
0.1125 (-1.0 + 0.66666667d)**3
improvement = 1.1375 + 0.1375 (-1.0 + 0.66666667d) + 0.1125 (-1.0 + 0.66666667d)**2 +
0.1125 (-1.0 + 0.66666667d)**3

Solving...
condition score=10000.00 in 415 iterations, 17.36 s: `gtol` termination condition is satisfied.

Period costs:
        max_budget           cost
period                           
0           200000  199847.171531
1           200000  199855.522165
2           200000  199867.822110
3           200000  199889.177843
4           200000  199911.476306

System condition groups in final period:
      min_condition  condition
type                          
A                60      100.0
B                50      100.0
C                40      100.0

The results are more interesting (and the solver more stressed) if you decrease the maximum budget from 200k to only 30k and pass to minimize

            options={
                'maxiter': 2_000,
                'verbose': 2,
                'gtol': 1e-1,
            },

a solution is still found:

condition score=6640.61 in 851 iterations, 26.29 s: `gtol` termination condition is satisfied.

Period costs:
        max_budget          cost
period                          
0            30000  29987.271349
1            30000  29987.513648
2            30000  29987.937541
3            30000  29988.420587
4            30000  29989.071768

System condition groups in final period:
      min_condition  condition
type                          
A                60  60.007024
B                50  51.626549
C                40  55.288215

From here, you can discretise the decision variable by using the above as an inner step in differential_evolution and passing integrality.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.