Source code for oemof.solph.models

# -*- coding: utf-8 -*-

"""Solph Optimization Models.

SPDX-FileCopyrightText: Uwe Krien <krien@uni-bremen.de>
SPDX-FileCopyrightText: Simon Hilpert
SPDX-FileCopyrightText: Cord Kaldemeyer
SPDX-FileCopyrightText: gplssm
SPDX-FileCopyrightText: Patrik Schönfeldt

SPDX-License-Identifier: MIT

"""
import logging
import warnings

from pyomo import environ as po
from pyomo.core.plugins.transform.relax_integrality import RelaxIntegrality
from pyomo.opt import SolverFactory

from oemof.solph import blocks
from oemof.solph import processing
from oemof.solph.plumbing import sequence


[docs]class BaseModel(po.ConcreteModel): """The BaseModel for other solph-models (Model, MultiPeriodModel, etc.) Parameters ---------- energysystem : EnergySystem object Object that holds the nodes of an oemof energy system graph constraint_groups : list (optional) Solph looks for these groups in the given energy system and uses them to create the constraints of the optimization problem. Defaults to `Model.CONSTRAINTS` objective_weighting : array like (optional) Weights used for temporal objective function expressions. If nothing is passed `timeincrement` will be used which is calculated from the freq length of the energy system timeindex . auto_construct : boolean If this value is true, the set, variables, constraints, etc. are added, automatically when instantiating the model. For sequential model building process set this value to False and use methods `_add_parent_block_sets`, `_add_parent_block_variables`, `_add_blocks`, `_add_objective` Attributes: ----------- timeincrement : sequence Time increments. flows : dict Flows of the model. name : str Name of the model. es : solph.EnergySystem Energy system of the model. meta : `pyomo.opt.results.results_.SolverResults` or None Solver results. dual : ... or None rc : ... or None """ CONSTRAINT_GROUPS = [] def __init__(self, energysystem, **kwargs): super().__init__() # ######################## Arguments ################################# self.name = kwargs.get("name", type(self).__name__) self.es = energysystem self.timeincrement = sequence( kwargs.get("timeincrement", self.es.timeincrement) ) if self.timeincrement[0] is None: try: self.timeincrement = sequence( self.es.timeindex.freq.nanos / 3.6e12 ) except AttributeError: msg = ( "No valid time increment found. Please pass a valid " "timeincremet parameter or pass an EnergySystem with " "a valid time index. Please note that a valid time" "index need to have a 'freq' attribute." ) raise AttributeError(msg) self.objective_weighting = kwargs.get( "objective_weighting", self.timeincrement ) self._constraint_groups = type(self).CONSTRAINT_GROUPS + kwargs.get( "constraint_groups", [] ) self._constraint_groups += [ i for i in self.es.groups if hasattr(i, "CONSTRAINT_GROUP") and i not in self._constraint_groups ] self.flows = self.es.flows() self.solver_results = None self.dual = None self.rc = None if kwargs.get("auto_construct", True): self._construct() def _construct(self): """ """ self._add_parent_block_sets() self._add_parent_block_variables() self._add_child_blocks() self._add_objective() def _add_parent_block_sets(self): """ " Method to create all sets located at the parent block, i.e. the model itself as they are to be shared across all model components. """ pass def _add_parent_block_variables(self): """ " Method to create all variables located at the parent block, i.e. the model itself as these variables are to be shared across all model components. """ pass def _add_child_blocks(self): """Method to add the defined child blocks for components that have been grouped in the defined constraint groups. """ for group in self._constraint_groups: # create instance for block block = group() # Add block to model self.add_component(str(block), block) # create constraints etc. related with block for all nodes # in the group block._create(group=self.es.groups.get(group)) def _add_objective(self, sense=po.minimize, update=False): """Method to sum up all objective expressions from the child blocks that have been created. This method looks for `_objective_expression` attribute in the block definition and will call this method to add their return value to the objective function. """ if update: self.del_component("objective") expr = 0 for block in self.component_data_objects(): if hasattr(block, "_objective_expression"): expr += block._objective_expression() self.objective = po.Objective(sense=sense, expr=expr)
[docs] def receive_duals(self): """Method sets solver suffix to extract information about dual variables from solver. Shadow prices (duals) and reduced costs (rc) are set as attributes of the model. """ # shadow prices self.dual = po.Suffix(direction=po.Suffix.IMPORT) # reduced costs self.rc = po.Suffix(direction=po.Suffix.IMPORT)
[docs] def results(self): """Returns a nested dictionary of the results of this optimization""" return processing.results(self)
[docs] def solve(self, solver="cbc", solver_io="lp", **kwargs): r"""Takes care of communication with solver to solve the model. Parameters ---------- solver : string solver to be used e.g. "glpk","gurobi","cplex" solver_io : string pyomo solver interface file format: "lp","python","nl", etc. \**kwargs : keyword arguments Possible keys can be set see below: Other Parameters ---------------- solve_kwargs : dict Other arguments for the pyomo.opt.SolverFactory.solve() method Example : {"tee":True} cmdline_options : dict Dictionary with command line options for solver e.g. {"mipgap":"0.01"} results in "--mipgap 0.01" {"interior":" "} results in "--interior" Gurobi solver takes numeric parameter values such as {"method": 2} """ solve_kwargs = kwargs.get("solve_kwargs", {}) solver_cmdline_options = kwargs.get("cmdline_options", {}) opt = SolverFactory(solver, solver_io=solver_io) # set command line options options = opt.options for k in solver_cmdline_options: options[k] = solver_cmdline_options[k] solver_results = opt.solve(self, **solve_kwargs) status = solver_results["Solver"][0]["Status"] termination_condition = solver_results["Solver"][0][ "Termination condition" ] if status == "ok" and termination_condition == "optimal": logging.info("Optimization successful...") else: msg = ( "Optimization ended with status {0} and termination " "condition {1}" ) warnings.warn( msg.format(status, termination_condition), UserWarning ) self.es.results = solver_results self.solver_results = solver_results return solver_results
[docs] def relax_problem(self): """Relaxes integer variables to reals of optimization model self.""" relaxer = RelaxIntegrality() relaxer._apply_to(self) return self
[docs]class Model(BaseModel): """An energy system model for operational and investment optimization. Parameters ---------- energysystem : EnergySystem object Object that holds the nodes of an oemof energy system graph constraint_groups : list Solph looks for these groups in the given energy system and uses them to create the constraints of the optimization problem. Defaults to `Model.CONSTRAINTS` **The following basic sets are created**: NODES : A set with all nodes of the given energy system. TIMESTEPS : A set with all timesteps of the given time horizon. FLOWS : A 2 dimensional set with all flows. Index: `(source, target)` **The following basic variables are created**: flow Flow from source to target indexed by FLOWS, TIMESTEPS. Note: Bounds of this variable are set depending on attributes of the corresponding flow object. """ CONSTRAINT_GROUPS = [ blocks.Bus, blocks.Transformer, blocks.InvestmentFlow, blocks.Flow, blocks.NonConvexFlow, ] def __init__(self, energysystem, **kwargs): super().__init__(energysystem, **kwargs) def _add_parent_block_sets(self): """ """ # set with all nodes self.NODES = po.Set(initialize=[n for n in self.es.nodes]) # pyomo set for timesteps of optimization problem self.TIMESTEPS = po.Set( initialize=range(len(self.es.timeindex)), ordered=True ) # previous timesteps previous_timesteps = [x - 1 for x in self.TIMESTEPS] previous_timesteps[0] = self.TIMESTEPS.last() self.previous_timesteps = dict(zip(self.TIMESTEPS, previous_timesteps)) # pyomo set for all flows in the energy system graph self.FLOWS = po.Set( initialize=self.flows.keys(), ordered=True, dimen=2 ) self.BIDIRECTIONAL_FLOWS = po.Set( initialize=[ k for (k, v) in self.flows.items() if hasattr(v, "bidirectional") ], ordered=True, dimen=2, within=self.FLOWS, ) self.UNIDIRECTIONAL_FLOWS = po.Set( initialize=[ k for (k, v) in self.flows.items() if not hasattr(v, "bidirectional") ], ordered=True, dimen=2, within=self.FLOWS, ) def _add_parent_block_variables(self): """ """ self.flow = po.Var(self.FLOWS, self.TIMESTEPS, within=po.Reals) for (o, i) in self.FLOWS: if self.flows[o, i].nominal_value is not None: if self.flows[o, i].fix[self.TIMESTEPS[1]] is not None: for t in self.TIMESTEPS: self.flow[o, i, t].value = ( self.flows[o, i].fix[t] * self.flows[o, i].nominal_value ) self.flow[o, i, t].fix() else: for t in self.TIMESTEPS: self.flow[o, i, t].setub( self.flows[o, i].max[t] * self.flows[o, i].nominal_value ) if not self.flows[o, i].nonconvex: for t in self.TIMESTEPS: self.flow[o, i, t].setlb( self.flows[o, i].min[t] * self.flows[o, i].nominal_value ) elif (o, i) in self.UNIDIRECTIONAL_FLOWS: for t in self.TIMESTEPS: self.flow[o, i, t].setlb(0) else: if (o, i) in self.UNIDIRECTIONAL_FLOWS: for t in self.TIMESTEPS: self.flow[o, i, t].setlb(0)