#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright © 2018 Gao Tang <gt70@duke.edu>
#
# Distributed under terms of the MIT license.
"""
trajOptCollocationProblem.py
This class implements the direct collocation approach for humanoid trajectory optimization
"""
from __future__ import division
import numpy as np
from scipy.interpolate import interp1d
from .trajOptBase import linearObj, linearPointObj
from .trajOptBase import linearPointConstr, linearConstr
from .trajOptBase import nonLinearPointObj, nonLinearObj
from .trajOptBase import nonLinearPointConstr, nonLinearConstr
from .trajOptBase import lqrObj, quadPenalty
from .trajOptBase import addX
from .trajOptBase import daeSystem
from . import SnoptConfig, probFun, solver
from .utility import randomGenInBound, checkInBounds, interp
from scipy.sparse import spmatrix, coo_matrix, csr_matrix
[docs]class trajOptCollocProblem(probFun):
"""A class for definition of trajectory optimization problem using collocation constraints.
A general framework for using this class is to:
1. Define a class which implements a DAE system, f(t, x, p, u)=0. This is a typical case for humanoid problem, but also flexible enough for simpler first order system.
2. Optionally, write desired cost function by subclass/creating from the list of available cost functions. This can be built incrementally by adding pieces.
3. Optionally, write desired constraint functions by subclass from available constraints. This can be built incrementally by adding pieces. Our approach can correctly detect the Jacobian structure.
4. Create this class with selected system, discretization, t0, tf range
5. Set bounds for state, control, parameters, x0 and xf
6. Add objective functions and constraints to this class
7. Call preProcess method explicitly
8. Create snoptConfig instance and choose desired options
9. Construct the solver
10. Use the solver to solve with either automatic guess or user provided guess
The system dynamics constraints are imposed using direct collocation approach with some additional optimization
variables as suggested by others.
"""
[docs] def __init__(self, sys, N, t0, tf, addx=None):
"""Initialize problem by system, discretization grid size, and allowable time
Change history: now I remove gradmode option since I require the gradient be provided analytically all the time.
I remove unnecessary linear objective functions.
I reorder variable so q, dq, ddq, u, p are at consecutive place.
:param sys: system, describe system dynamics
:param N: int, discretization grid size, a uniform grid
:param t0: float/array like, allowable t0
:param tf: float/array like, allowable tf
:param addX: list of addX / one addX / None, additional optimization variables.
"""
assert isinstance(sys, daeSystem)
self.sys = sys
self.N = N
self.nPoint = 2 * self.N - 1
numT = self._handleTime(t0, tf)
self.dimx = sys.nx
self.dimdyn = sys.nf # dimension of dynamics constraint
self.dimu = sys.nu
self.dimp = sys.np
self.dimpoint = sys.nx + sys.nu + sys.np # each point have those variables
self.daeOrder = sys.order
if self.daeOrder == 1:
self.dimq = self.dimdyn // 2
elif self.daeOrder == 2:
self.dimq = self.dimdyn
self.ubd = [None, None]
self.xbd = [None, None]
self.pbd = [None, None]
self.x0bd = [None, None]
self.xfbd = [None, None]
# lqr cost function
self.lqrObj = None
self.LQRnG = 0
# Linear cost function
self.linearObj = [] # stores general linear cost
self.linPointObj = [] # stores linear cost imposed at a point
self.linPathObj = [] # stores Lagrange integral cost
# nonlinear cost function
self.nonLinObj = [] # stores general nonlinear cost
self.nonPointObj = [] # stores nonlinear cost imposed at a point
self.nonPathObj = [] # stores Lagrange integral cost. Includes LQR cost
# nonlinear constraints. Linear constraints are treated as nonlinear
self.pointConstr = [] # general constraint imposed at a certain point, such as initial and final point
self.pathConstr = [] # general constraint imposed everywhere such as collision avoidance
self.pathConstrIndexPairs = [] # this records the indexes that path constraints are imposed
self.nonLinConstr = [] # stores general nonlinear constraint
self.linPointConstr = []
self.linPathConstr = []
self.linearConstr = []
# calculate number of variables to be optimized, time are always the last
numX = self.nPoint * self.dimx
numU = self.nPoint * self.dimu
numP = self.nPoint * self.dimp
if addx is None:
self.lenAddX = 0
else:
if not isinstance(addx, list):
addx = [addx]
for tmp in addx:
assert isinstance(tmp, addX)
self.addX = addx
self.lenAddX = sum([tmp.n for tmp in addx])
numSol = numX + numU + numP + numT
self.numX = numX
self.numU = numU
self.numP = numP
self.numT = numT
self.numDynVar = numX + numU + numP # numDynVar includes q, dq, ddq, u, p
self.numTraj = numSol # make it clear, numTraj contains numDynVar + time
self.numSol = numSol + self.lenAddX
self.t0ind, self.tfind = self.__getTimeIndices()
self.colloc_constr_is_on = False
[docs] def pre_process(self, colloc_constr_is_on=False, defect_u=True, defect_p=True):
"""Initialize the instances of probFun now we are ready.
Call this function after the objectives and constraints have been set appropriately.
It calculate the space required for SNOPT and allocates sparsity structure if necessary.
:param colloc_constr_is_on: bool, if we also impose constraints on those collocation points.
:param defect_u: bool, if we want to impose defect constraint on u, i.e. umid=(u0+uf)/2
:param defect_p: bool, if we want to impose defect constraint on p, i.e. pmid=(p0+pf)/2
**Caveat** it might make problem over-constrained, if the path constraints are equality constraints.
"""
self.defectU = defect_u
self.defectP = defect_p
self.colloc_constr_is_on = colloc_constr_is_on
numDyn = self.dimdyn * self.nPoint # constraints from system dynamics, they are imposed everywhere
dynDefectSize = 2 * self.daeOrder * self.dimdyn
self.dynDefectSize = dynDefectSize
defectSize = dynDefectSize
if defect_u:
defectSize += self.dimu
if defect_p:
defectSize += self.dimp
self.defectSize = defectSize
numDefectDyn = (self.N - 1) * defectSize # from enforcing those guys
self.numDefectDyn = numDefectDyn
self.numDyn = numDyn + numDefectDyn # from both nonlinear dynamics and linear defect constraints
# constrain t0 and tf
self._set_t0_tf_constr()
numC, nnonlincon, nlincon = self.__sumConstrNum__()
self.numLinCon = nlincon
self.numNonLinCon = nnonlincon
self.__findMaxNG__()
self.numF = 1 + numDyn + numDefectDyn + numC
# analyze all objective functions in order to detect pattern for A, and additional variables for other nonlinear objective function
spA, addn = self.__analyzeObj__(self.numSol, self.numF)
self.objaddn = addn # this is important for multiple objective function support
self.numSol += addn
self.numF += addn
probFun.__init__(self, self.numSol, self.numF) # not providing G means we use finite-difference
# we are ready to write Aval, Arow, Acol for this problem. They are arranged right after dynamics
self.__setAPattern__(numDyn, nnonlincon, spA)
self.__setXbound__()
self.__setFbound__()
# detect gradient information
randX = self.randomGenX()
self.__turnOnGrad__(randX)
def _handleTime(self, t0, tf):
"""Deal with time settings.
If t0 and tf are both free, we want tf to be greater than t0
:param t0: float/array like, allowable t0
:param tf: float/array like, allowable tf
:return: int, number of variable associated with time
"""
self.tf = tf
self.t0 = t0
numT = 2
if np.isscalar(tf):
self.fixtf = True
numT -= 1
else:
self.fixtf = False
assert tf[0] <= tf[1]
if np.isscalar(t0):
self.fixt0 = True
numT -= 1
else:
self.fixt0 = False
assert t0[0] <= t0[1]
if self.fixt0 and self.fixtf:
self.fixTimeMode = True
else:
self.fixTimeMode = False
return numT
def _set_t0_tf_constr(self):
"""Based on occasions, we set constraints on time."""
if self.fixt0:
if not self.fixtf:
self.tf[0] = max(self.tf[0], self.t0 + 1e-6)
else:
if self.fixtf:
self.t0[1] = min(self.t0[1], self.tf - 1e-6)
else:
if self.t0[1] > self.tf[0]: # we might have trouble
a = np.zeros((1, self.numSol))
a[0, self.t0ind] = 1
a[0, self.tfind] = -1
self.addConstr(linearConstr(a, -1e20*np.ones(1), 1e-6*np.ones(1)))
def __sumConstrNum__(self):
"""It simply calculate constraint numbers."""
numC = 0
for constr in self.pointConstr:
numC += constr.nf
for constr, (start, end) in zip(self.pathConstr, self.pathConstrIndexPairs):
tmpN = end - start
if self.colloc_constr_is_on:
numC += (2*tmpN - 1) * constr.nf
else:
numC += tmpN * constr.nf
for constr in self.nonLinConstr:
numC += constr.nf
nnonlincon = numC
for constr in self.linPointConstr:
numC += constr.A.shape[0]
for constr in self.linPathConstr:
if self.colloc_constr_is_on:
numC += constr.A.shape[0] * self.nPoint
else:
numC += constr.A.shape[0] * self.N
for constr in self.linearConstr:
numC += constr.A.shape[0]
nlincon = numC - nnonlincon
return numC, nnonlincon, nlincon
[docs] def genGuessFromTraj(self, X=None, U=None, P=None, t0=None, tf=None, addx=None, tstamp=None, obj=None, interp_kind='linear'):
"""Generate an initial guess for the problem with user specified information.
An amazing feature is the user does not have to give a solution of exactly the same time-stamped trajectory used internally.
Interpolation approaches are used in such scenarios.
The user is not required to provide them all, although we suggest they do so.
:param X: ndarray, (x, x) each row corresponds to a state snapshot (if tstamp is None, assume equidistant grid). Column size can be dimx or dimx/sys.order
:param U: ndarray, (x, dimu) each row corresponds to a control snapshot. Similar to X but with column size equals dimu
:param P: ndarray, (x, dimp) each row corresponds to a parameter snapshot. Similar to X but with column size equals dimp
:param t0/tf: float/array-like, initial/final time. If None, we randomly generate one
:param addx: list of ndarray, guess of addx, if applicable
:param tstamp: ndarray, (x,), None if the X/U/P are provided using equidistant grid.
:param obj: ndarray, (x,) the objective part
:param interp_kind: str, interpolation type for scipy.interpolate.interp1d, can be (‘linear’, ‘nearest’, ‘zero’, ‘slinear’, ‘quadratic’, ‘cubic’)
"""
randX = 2 * np.random.random(self.numSol) - 1
Xtarget, Utarget, Ptarget = self.__parseX__(randX)
if obj is not None:
obj_ = self.__parseObj__(randX)
obj_[:] = obj
# generate t0 and tf, if applicable
if self.t0ind > 0:
if t0 is None:
randX[self.t0ind] = randomGenInBound(self.t0)
else:
randX[self.t0ind] = randomGenInBound(t0)
uset0 = randX[self.t0ind]
else:
uset0 = self.t0
if self.tfind > 0:
if tf is None:
randX[self.tfind] = randomGenInBound(self.tf)
else:
randX[self.tfind] = randomGenInBound(tf)
usetf = randX[self.tfind]
else:
usetf = self.tf
teval = np.linspace(uset0, usetf, self.nPoint)
# interpolation for state variables
nPoint = self.nPoint
dimx = self.dimx
dimx_ = dimx // (self.sys.order + 1)
if X is not None:
Xcol = X.shape[1]
if not (Xcol == dimx_ or Xcol == dimx):
print('The column of X is not %d or %d, not use it' % (dimx, dimx_))
X = None
else: # use interpolation to do it
interp(tstamp, X, teval, Xtarget, interp_kind)
if X is None:
# straight path go there
for i in range(self.nPoint):
Xtarget[i] = randomGenInBound(self.xbd, self.dimx)
# randomly generate x0 and xf
Xtarget[0, :dimx] = randomGenInBound(self.x0bd, self.dimx)
Xtarget[-1, :dimx] = randomGenInBound(self.xfbd, self.dimx)
# interpolation for control variable
if U is not None:
interp(tstamp, U, teval, Utarget, interp_kind)
else:
for i in range(nPoint):
Utarget[i] = randomGenInBound(self.ubd, self.dimu)
if self.numP > 0:
if P is not None:
interp(tstamp, P, teval, Ptarget, interp_kind)
else:
for i in range(nPoint):
Ptarget[i] = randomGenInBound(self.pbd, self.dimp)
# generate for
if self.lenAddX > 0:
if addx is None:
for field, addx_ in zip(self.__parseAddX__(randX), self.addX):
field[:] = randomGenInBound([addx_.lb, addx_.ub], addx_.n)
else:
for guess, field, addx_ in zip(addx, self.__parseAddX__(randX), self.addX):
field[:] = randomGenInBound(guess, addx_.n)
# I do not have to worry about objaddn since they are linear
return randX
[docs] def genGuessFromSol(self, parsed_sol):
"""Generate an initial guess from a previous solution. Mainly change grid size or add perturbation. But determining structure is difficult
:param parsed_sol: dictionary, output of calling parseSol
"""
t = parsed_sol['t']
x = parsed_sol['x']
u = parsed_sol['u']
p = parsed_sol.get('p', None)
addx = parsed_sol.get('addx', None)
obj = parsed_sol.get('obj', None)
return self.genGuessFromTraj(X=x, U=u, P=p, t0=t[0], tf=t[-1], addx=addx, tstamp=t, obj=obj, interp_kind='cubic')
def __findMaxNG__(self):
"""Loop over all the constraints, find max NG. We then create temporary data for them."""
maxnG = 0
maxnG = max(maxnG, self.sys.nG)
for constr in self.pointConstr:
maxnG = max(maxnG, constr.nG)
for constr in self.pathConstr:
maxnG = max(maxnG, constr.nG)
for constr in self.nonLinConstr:
maxnG = max(maxnG, constr.nG)
self.G = np.zeros(maxnG)
self.row = np.zeros(maxnG, dtype=int)
self.col = np.zeros(maxnG, dtype=int)
def __analyzeObj__(self, numSol, numF):
"""Analyze the objective function.
:param numSol: current estimation of free variables
:param numF: current estimation of rows of constraints
:return spA: coo sparse matrix, records first row of A, and last rows of A
:return addn: int, additional nonlinear constraint. As long as nonlinear obj exists, this is non-zero
"""
# detect how many nonlinear objective functions we have
if self.lqrObj is None:
nnlin = 0
else:
nnlin = 1
nnlin += len(self.nonLinObj) + len(self.nonPathObj) + len(self.nonPointObj)
addn = nnlin
# analyze the linear objective functions in a good way
A = np.zeros(numSol)
for obj in self.linearObj:
A[obj.A.col] += obj.A.data
for obj in self.linPointObj:
A[self.__patchCol__(obj.index, obj.A.col)] += obj.A.data
for obj in self.linPathObj: # this is not particularly useful, I have to say
for i in range(self.nPoint):
A[self.__patchCol__(i, obj.A.col)] += obj.A.data
# get sparse representation of A
nnzind = np.nonzero(A)[0]
A_ = np.zeros(2 * addn)
row_ = np.zeros(2 * addn, dtype=int)
col_ = np.zeros(2 * addn, dtype=int)
# for the addn
for i in range(addn):
A_[i] = 1
A_[addn + i] = -1
col_[i] = numSol + i
col_[addn + i] = numSol + i
row_[i] = 0
row_[addn + i] = numF + i
# concat them
catA = np.concatenate((A[nnzind], A_))
catArow = np.concatenate((np.zeros(len(nnzind), dtype=int), row_))
catAcol = np.concatenate((nnzind, col_))
if catA.shape[0] == 0:
spA = coo_matrix(([], ([], [])), shape=(addn + numF, numSol))
else:
spA = coo_matrix((catA, (catArow, catAcol)))
return spA, addn
def __setAPattern__(self, ndyncon, nnonlincon, spA):
"""Set sparsity pattern from linear constraints and objective functions.
It finds sparsity pattern from defect constraints, linear constraints, and objective functions.
It also finds sparsity pattern from those linear constraints.
The A matrix from objective function is given in the sparse A and we just append it.
The rows from linear constraints are straightforward.
The constraints from defect constraints lie from rows 1 + ndyncon and occupies defectSize rows
After nnonlincon rows (empty in A), we set linear constraints.
The size of defect A: dynDefectSize = 2 * daeOrder * dimdyn and defectSize = dynDefectSize + dimu + dimp
It sums up to 3*(dimu + dimp) + daeOrder*5*2*dimdyn nnz
We have to do this for self.N - 1 mid-points.
:param ndyncon: int, describes how many dynamics constraints we have
:param nnonlincon: int, describes how many nonlinear constraints we have
:param spA: sparse matrix, how the objective function is described linearly.
"""
curRow, A, row, col = self.__setDefectPattern__(ndyncon, self.defectU, self.defectP)
curRow += nnonlincon
# we are ready to parse linear constraints
lstCA, lstCArow, lstCAcol = self.__parseLinearConstraints__(curRow)
# concatenate all those things together
lstCA.append(spA.data)
lstCA.append(A)
lstCArow.append(spA.row)
lstCArow.append(row)
lstCAcol.append(spA.col)
lstCAcol.append(col)
Aval = np.concatenate(lstCA)
Arow = np.concatenate(lstCArow)
Acol = np.concatenate(lstCAcol)
self.set_a_by_triplet(Aval, Arow, Acol)
self.spA = csr_matrix((self.Aval, (self.Arow, self.Acol)), shape=(self.nf, self.nx))
self.spA_coo = self.spA.tocoo()
def __setDefectPattern__(self, ndyncon, withu=True, withp=True):
"""Set the sparse linear constraints from defect constraints.
:param ndyncon: number of dynamical constraints. This sets starting row.
:param withu: bool, determines if we set defect constraint on u
:param withp: bool, determines if we set defect constraint on p
"""
dimx, dimu, dimp, dimpoint, dimdyn = self.dimx, self.dimu, self.dimp, self.dimpoint, self.dimdyn
if not withp:
dimp = 0
if not withu:
dimu = 0
if self.fixTimeMode:
lenA = (10*self.daeOrder*self.dimdyn + 3*(dimu + dimp)) * (self.N - 1)
else:
lenA = (6*self.daeOrder*self.dimdyn + 3*(dimu + dimp)) * (self.N - 1)
A = np.zeros(lenA)
row = np.zeros(lenA, dtype=int)
col = np.zeros(lenA, dtype=int)
curNA = 0
curRow = 1 + ndyncon
# find those three matrix
spL, spM, spR = self.__findMatLMRTemplate()
for i in range(self.N - 1):
midi = 2*i + 1
lefti = 2*i
righti = 2*(i + 1)
for i in range(self.daeOrder):
A[curNA: curNA + spL.nnz] = spL.data
row[curNA: curNA + spL.nnz] = spL.row + curRow
col[curNA: curNA + spL.nnz] = spL.col + lefti * dimpoint + i * dimdyn
curNA += spL.nnz
A[curNA: curNA + spM.nnz] = spM.data
row[curNA: curNA + spM.nnz] = spM.row + curRow
col[curNA: curNA + spM.nnz] = spM.col + midi * dimpoint + i * dimdyn
curNA += spM.nnz
A[curNA: curNA + spR.nnz] = spR.data
row[curNA: curNA + spR.nnz] = spR.row + curRow
col[curNA: curNA + spR.nnz] = spR.col + righti * dimpoint + i * dimdyn
curNA += spR.nnz
curRow += 2*dimdyn # since size of spL, it is 2d by 2d
# then do the constraint of u and p on nodes and knots, basically, midpoint is the average of two knots
for i in range(self.N - 1):
midi = 2*i + 1
lefti = 2*i
righti = 2*(i + 1)
A[curNA: curNA + 2*dimu] = 0.5
A[curNA + 2*dimu: curNA + 3*dimu] = -1
A[curNA + 3*dimu: curNA + 3 * dimu + 2*dimp] = 0.5
A[curNA + 3*dimu + 2 * dimp: curNA + 3*dimu + 3*dimp] = -1
row[curNA: curNA + 3 * dimu] = curRow + np.tile(np.arange(dimu), (3, 1)).flatten()
col[curNA: curNA + dimu] = lefti * dimpoint + dimx + np.arange(dimu)
col[curNA + dimu: curNA + 2 * dimu] = righti * dimpoint + dimx + np.arange(dimu)
col[curNA + 2 * dimu: curNA + 3 * dimu] = midi * dimpoint + dimx + np.arange(dimu)
curNA_ = curNA + 3 * dimu
row[curNA_: curNA_ + 3 * dimp] = curRow + dimu + np.tile(np.arange(dimp), (3, 1)).flatten()
col[curNA_: curNA_ + dimp] = lefti * dimpoint + dimx + dimu + np.arange(dimp)
col[curNA_ + dimp: curNA_ + 2 * dimp] = righti * dimpoint + dimx + dimu + np.arange(dimp)
col[curNA_ + 2 * dimp: curNA_ + 3 * dimp] = midi * dimpoint + dimx + dimu + np.arange(dimp)
curNA += 3 * (dimu + dimp)
curRow += dimu + dimp
return curRow, A, row, col
def __parseLinearConstraints__(self, curRow):
"""Parse the linear constraints and form a sparse matrix.
:param curRow: current row of accumulated constraints.
"""
lstCA = []
lstCArow = []
lstCAcol = []
for constr in self.linPointConstr: # TODO: support for time to be done
lstCA.append(constr.A.data)
lstCArow.append(constr.A.row + curRow)
lstCAcol.append(self.__patchCol__(constr.index, constr.A.col)) # take care on here
curRow += constr.A.shape[0]
for constr in self.linPathConstr:
for j in range(self.nPoint):
if not self.colloc_constr_is_on:
if j % 2 == 1:
continue
index = j
lstCA.append(constr.A.data)
lstCArow.append(constr.A.row + curRow)
lstCAcol.append(self.__patchCol__(index, constr.A.col))
curRow += constr.A.shape[0]
for constr in self.linearConstr:
# the users have to be aware of the columns
lstCA.append(constr.A.data)
lstCArow.append(constr.A.row + curRow)
lstCAcol.append(constr.A.col)
curRow += constr.A.shape[0]
return lstCA, lstCArow, lstCAcol
def __findMatLMRTemplate(self):
"""Assume h is fixed, we find the L, M, R matrix for defect constraints.
The goal is for a pair (q^(k), q^{(k+1)}) we want MatL*L+MatM*M+MatR*R=0
where L, M, R are such pair at left point, mid-point, and right point.
"""
d = self.dimdyn
matL = np.zeros((2*d, 2*d))
matM = np.zeros((2*d, 2*d))
matR = np.zeros((2*d, 2*d))
np.fill_diagonal(matL[:d, :d], 0.5)
np.fill_diagonal(matL[d:, d:], -0.25)
np.fill_diagonal(matM, -1)
np.fill_diagonal(matR[:d, :d], 0.5)
np.fill_diagonal(matR[d:, d:], -0.25)
# time dependent parts
if self.fixTimeMode:
h = (self.tf - self.t0) / (self.N - 1)
np.fill_diagonal(matL[:d, d:], h/8)
np.fill_diagonal(matL[d:, :d], -1.5/h)
np.fill_diagonal(matR[:d, d:], -h/8)
np.fill_diagonal(matR[d:, :d], 1.5/h)
spL = coo_matrix(matL)
spM = coo_matrix(matM)
spR = coo_matrix(matR)
return spL, spM, spR
[docs] def randomGenX(self):
"""A more reansonable approach to generate random guess for the problem.
It considers bounds on initial and final states so this is satisfied.
Then it linearly interpolate between states.
Controls are randomly generated within control bound, if it presents. Otherwise [-1, 1]
:return x: ndarray, (numSol, ) an initial guess of the solution
"""
nPoint = self.nPoint
dimx = self.dimx
randX = 2*np.random.random(self.numSol) - 1
X, U, P = self.__parseX__(randX)
# randomly generate x0 and xf
X[0, :dimx] = randomGenInBound(self.x0bd, self.dimx)
X[-1, :dimx] = randomGenInBound(self.xfbd, self.dimx)
# straight path go there
for i in range(self.dimx):
X[:, i] = np.linspace(X[0, i], X[-1, i], nPoint)
for i in range(nPoint):
U[i] = randomGenInBound(self.ubd, self.dimu)
if self.numP > 0:
for i in range(nPoint):
P[i] = randomGenInBound(self.pbd, self.dimp)
if self.t0ind > 0:
randX[self.t0ind] = randomGenInBound(self.t0)
if self.tfind > 0:
randX[self.tfind] = randomGenInBound(self.tf)
if self.lenAddX > 0:
for field, addx in zip(self.__parseAddX__(randX), self.addX):
field[:] = randomGenInBound([addx.lb, addx.ub], addx.n)
# I do not have to worry about objaddn since they are linear
return randX
def __turnOnGrad__(self, x0):
"""Turn on gradient, this is called after an initial x0 has been generated"""
self.grad = True
self.__getSparsity__(x0)
def __getSparsity__(self, x0):
"""Detect sparsity of the problem with an initial guess."""
numObjG = self.__getObjSparsity(x0)
self.numObjG = numObjG
# summarize number of pure linear constraints
numDynG = self.__getDynSparsity(x0)
numCG = 0 # G from C
h, useT = self.__get_time_grid__(x0)
useX, useU, useP = self.__parseX__(x0)
i = np.random.randint(self.nPoint)
tmpx = np.concatenate(([useT[i]], useX[i], useU[i], useP[i]))
for constr in self.pointConstr:
numCG += constr.nG
constr.findTimeGradient(tmpx)
if not constr.autonomous:
n = len(constr.timeindex)
numCG += (self.numT - 1) * n
for constr, (start, end) in zip(self.pathConstr, self.pathConstrIndexPairs):
tmpN = end - start
if self.colloc_constr_is_on:
numCG += (2*tmpN - 1) * constr.nG
else:
numCG += tmpN * constr.nG
constr.findTimeGradient(tmpx)
if not constr.autonomous:
n = len(constr.timeindex)
if self.colloc_constr_is_on:
numCG += (self.numT - 1) * n * self.nPoint
else:
numCG += (self.numT - 1) * n * self.N
for constr in self.nonLinConstr:
numCG += constr.nG
numG = numObjG + numDynG + numCG
self.numG = numG
self.nG = numG
def __getDynSparsity(self, x):
"""Set sparsity of the problem caused by system dynamics and other nonlinear constraints.
Sparsity pattern should be quite straight-forward to conclude since we introduce many auxiliary variables.
The nG from dae system should be directly used and it is complete.
The defect constraints introduce 5*dimq*4 gradients
:param x: ndarray, the guess/sol
"""
# this row is the case when numDefectDyn are imposed in G rather than A
# dynG = self.sys.nG * self.nPoint + (20 * self.dimdyn + 3*(self.dimu + self.dimp)) * (self.N - 1)
dynG = self.sys.nG * self.nPoint
usex = x[:self.dimpoint]
self.sys.findTimeGradient(usex)
if not self.sys.autonomous:
n = len(self.timeindex)
dynG += self.nPoint * (self.numT - 1) * n
# dynG arising from defect constraints
if not self.fixTimeMode:
dynG += (self.N - 1) * self.daeOrder * 4 * self.dimdyn # those are purely from the defect matrix
dynG += self.numT * (self.N - 1) * self.daeOrder * self.dimdyn * 2 # since time is free
return dynG
def __getObjSparsity(self, x):
"""Set sparsity structure of the problem from objective function.
The objective function pattern is composed of two parts:
- linear parts. We sum all the coefficients and find sparsity pattern out of it
- nonlinear parts. Each nonlinear objective is augmented with another row in jacobian
and a another auxiliary optimization variable s.t. c(x) = y and J += y
:param x: ndarray, the guess/sol
:returns: nG: int, # Jacobian from nonlinear objective function
"""
h, useT = self.__get_time_grid__(x)
useX, useU, useP = self.__parseX__(x)
i = np.random.randint(self.nPoint)
tmpx = np.concatenate(([useT[i]], useX[i], useU[i], useP[i]))
nG = 0
# check sparseObj mode
if self.lqrObj is not None:
nG += self.LQRnG # this is always changing with time so do not worry
for obj in self.nonLinObj:
nG += obj.nG # this assume user knows preciously what is going on so do not care
for obj in self.nonPointObj:
nG += obj.nG # in this case, if not autonomous, depending on numT, we might have to
obj.findTimeGradient(tmpx) # detect pattern
if not obj.autonomous: # since it is objective, only one piece is time, we have counted it once
nG += (self.numT - 1) # so if numT=0, we remove it, 1 is fine, 2 we increase one more
for obj in self.nonPathObj:
nG += self.nPoint * obj.nG
obj.findTimeGradient(tmpx)
if not obj.autonomous:
nG += self.nPoint * (self.numT - 1) # this is okay, I guess
return nG
def __getTimeIndices(self):
"""Utility function for assigning sparsity structure."""
t0ind = -1
tfind = -1
lenX = self.nPoint * self.dimpoint
if self.fixt0:
if self.fixtf:
pass
else:
tfind = lenX
else:
if self.fixtf:
t0ind = lenX
else:
t0ind = lenX
tfind = lenX + 1
return t0ind, tfind
def __setXbound__(self):
"""Set bounds on decision variables."""
# create bound on x
dimpnt = self.dimpoint
dimx, dimu, dimp = self.dimx, self.dimu, self.dimp
xlb = np.zeros(self.numSol)
xub = np.zeros(self.numSol)
Mxlb = np.reshape(xlb[:self.numDynVar], (self.nPoint, dimpnt))
Mxub = np.reshape(xub[:self.numDynVar], (self.nPoint, dimpnt))
Mulb = Mxlb[:, dimx:dimx+dimu]
Muub = Mxub[:, dimx:dimx+dimu]
Mplb = Mxlb[:, dimpnt-dimp:dimpnt]
Mpub = Mxub[:, dimpnt-dimp:dimpnt]
# set bounds for q and dq, agree with previous convention
if self.xbd[0] is not None:
Mxlb[:, :dimx] = self.xbd[0]
else:
Mxlb[:, :dimx] = -1e20
# set lb for x0 and xf
if self.x0bd[0] is not None:
Mxlb[0, :dimx] = self.x0bd[0]
if self.xfbd[0] is not None:
Mxlb[-1, :dimx] = self.xfbd[0]
if self.xbd[1] is not None:
Mxub[:, :dimx] = self.xbd[1]
else:
Mxub[:, :dimx] = 1e20
# set ub for x0 and xf
if self.x0bd[1] is not None:
Mxub[0, :dimx] = self.x0bd[1]
if self.xfbd[1] is not None:
Mxub[-1, :dimx] = self.xfbd[1]
# set bounds for control variable
if self.ubd[0] is not None:
Mulb[:] = self.ubd[0]
else:
Mulb[:] = -1e20
if self.ubd[1] is not None:
Muub[:] = self.ubd[1]
else:
Muub[:] = 1e20
if self.pbd[0] is not None and self.dimp > 0:
Mplb[:] = self.pbd[0]
else:
Mplb[:] = -1e20
if self.pbd[1] is not None and self.dimp > 0:
Mpub[:] = self.pbd[1]
else:
Mpub[:] = 1e20
# set bound on time
if not self.fixt0:
xlb[self.t0ind] = self.t0[0]
xub[self.t0ind] = self.t0[1]
if not self.fixtf:
xlb[self.tfind] = self.tf[0]
xub[self.tfind] = self.tf[1]
# set bound on addX
if self.lenAddX != 0:
curN = self.numTraj
for addx in self.addX:
xlb[curN: curN + addx.n] = addx.lb
xub[curN: curN + addx.n] = addx.ub
# set bound on objaddn, this is obvious
xlb[-self.objaddn:] = -1e20
xub[-self.objaddn:] = 1e20
# assign to where it should belong to
self.set_xlb(xlb)
self.set_xub(xub)
def __setFbound__(self):
"""Set bound on F"""
# set bound on F
numF = self.numF
numDyn = self.numDyn
clb = np.zeros(numF)
cub = np.zeros(numF)
clb[0] = -1e20
cub[0] = 1e20
cind0 = 1 + numDyn
cind0 = self._setNonLinConstr(clb, cub, cind0)
cind0 = self._setLinConstr(clb, cub, cind0)
# the bounds for objaddn is 0 so we are good so far
# assign to where it should belong to
self.lb = clb
self.ub = cub
def _setNonLinConstr(self, clb, cub, cind0):
for constr in self.pointConstr:
if constr.lb is not None:
clb[cind0: cind0 + constr.nf] = constr.lb
if constr.ub is not None:
cub[cind0: cind0 + constr.nf] = constr.ub
cind0 += constr.nf
for constr, (start, end) in zip(self.pathConstr, self.pathConstrIndexPairs):
tmpN = end - start
if self.colloc_constr_is_on:
useN = 2 * tmpN - 1
else:
useN = tmpN
tmplb = np.reshape(clb[cind0: cind0 + constr.nf * useN], (useN, constr.nf))
tmpub = np.reshape(cub[cind0: cind0 + constr.nf * useN], (useN, constr.nf))
cind0 += constr.nf * useN
if constr.lb is not None:
tmplb[:] = constr.lb
if constr.ub is not None:
tmpub[:] = constr.ub
for constr in self.nonLinConstr:
if constr.lb is not None:
clb[cind0: cind0 + constr.nf] = constr.lb
if constr.ub is not None:
cub[cind0: cind0 + constr.nf] = constr.ub
cind0 += constr.nf
return cind0
def _setLinConstr(self, clb, cub, cind0):
# the rest are linear constraints and we should write those bounds, too
for constr in self.linPointConstr:
cindf = cind0 + constr.A.shape[0]
clb[cind0: cindf] = constr.lb
cub[cind0: cindf] = constr.ub
cind0 = cindf
for constr in self.linPathConstr:
if self.colloc_constr_is_on:
N = self.nPoint
else:
N = self.N
cindf = cind0 + N * constr.A.shape[0]
clb[cind0: cindf] = np.tile(constr.lb, (N, 1)).flatten()
cub[cind0: cindf] = np.tile(constr.ub, (N, 1)).flatten()
cind0 = cindf
for constr in self.linearConstr:
cindf = cind0 + constr.A.shape[0]
clb[cind0: cindf] = constr.lb
cub[cind0: cindf] = constr.ub
cind0 = cindf
return cind0
def __get_time_grid__(self, x):
"""Based on initial guess x, get the time grid for discretization.
:param x: ndarray, the guess/sol.
:returns: h: float, grid size
:returns: useT: the grid being used
"""
if self.fixt0:
uset0 = self.t0
else:
uset0 = x[self.t0ind]
if self.fixtf:
usetf = self.tf
else:
usetf = x[self.tfind]
h = (usetf - uset0) / (self.N - 1)
useT = np.linspace(uset0, usetf, self.nPoint)
return h, useT
def __parseX__(self, x):
"""Parse guess/sol into X, U, P"""
X = np.reshape(x[:self.numDynVar], (self.nPoint, self.dimpoint))
useX = X[:, :self.dimx]
useU = X[:, self.dimx:self.dimpoint - self.dimp]
useP = X[:, self.dimpoint - self.dimp:]
return useX, useU, useP
[docs] def parseF(self, guess, y=None):
"""Give an guess, evaluate it and parse into parts.
:param guess: ndarray, (numSol, ) a guess or a solution to check
:param y: ndarray, (numF, ) if None, it stores the solution
:returns: dict, containing objective and parsed constraints
"""
assert len(guess) == self.numSol
N = self.N
nPoint = self.nPoint
dimx = self.dimx
dimdyn = self.dimdyn
if y is None:
y = np.zeros(self.numF)
if self.grad:
self.__callg__(guess, y, np.zeros(1), np.zeros(1), np.zeros(1), False, False)
else:
self.__callf__(guess, y)
y += self.spA.dot(guess)
obj = y[0]
dynCon = np.reshape(y[1:(2*N-1)*dimdyn+1], (2*N - 1, dimdyn))
curN = 1 + (2 * N - 1) * dimdyn
curNf = curN + self.dynDefectSize * (N - 1)
defectCon = dict()
dynDefectCon = np.reshape(y[curN: curNf], (N - 1, -1))
defectCon['dyn'] = dynDefectCon
curN = curNf
if self.defectU:
dimu = self.dimu
else:
dimu = 0
if self.defectP:
dimp = self.dimp
else:
dimp = 0
curNf += (dimu + dimp) * (N - 1)
upDefectCon = np.reshape(y[curN: curNf], (N - 1, -1))
if dimu > 0:
defectCon['u'] = upDefectCon[:, :dimu]
if dimp > 0:
defectCon['p'] = upDefectCon[:, dimu:dimu+dimp]
curN = curNf
pointCon = []
for constr in self.pointConstr:
pointCon.append(y[curN: curN + constr.nf])
curN += constr.nf
pathCon = []
for constr, (start, end) in zip(self.pathConstr, self.pathConstrIndexPairs):
tmpN = end - start
if self.colloc_constr_is_on:
useN = 2*tmpN - 1
else:
useN = tmpN
pathCon.append(np.reshape(y[curN: curN+useN*constr.nf], (useN, constr.nf)))
curN += useN*constr.nf
nonLinCon = []
for constr in self.nonLinConstr:
nonLinCon.append(y[curN: curN+constr.nf])
curN += constr.nf
# all linear constraints can be ignored, here we ignore them
# check bounds, return a -1, 1 value for non-equality bounds, and 0 for equality bounds
useX, useU, useP = self.__parseX__(guess)
Xbound = checkInBounds(useX[:, :dimx], self.xbd)
x0bound = checkInBounds(useX[0, :dimx], self.x0bd)
xfbound = checkInBounds(useX[-1, :dimx], self.xfbd)
ubound = checkInBounds(useU, self.ubd)
if self.dimp > 0:
pbound = checkInBounds(useP, self.pbd)
else:
pbound = None
if self.t0ind > 0:
t0bound = checkInBounds(guess[self.t0ind], self.t0)
else:
t0bound = None
if self.tfind > 0:
tfbound = checkInBounds(guess[self.tfind], self.tf)
else:
tfbound = None
if self.lenAddX > 0:
addx = self.__parseAddX__(guess)
addXbound = [checkInBounds(addx_, [addx__.lb, addx__.ub]) for addx_, addx__ in zip(addx, self.addX)]
else:
addXbound = None
useX, useU, useP = self.__parseX__(guess)
objs = guess[self.numSol - self.objaddn:]
rst = {'obj': obj, 'objs': objs, 'dyn': dynCon, 'defect': defectCon, 'point': pointCon, 'path': pathCon, 'nonlin': nonLinCon,
'xbd': Xbound, 'ubd': ubound, 'x0bd': x0bound, 'xfbd': xfbound, 'pbd': pbound,
't0bd': t0bound, 'tfbd': tfbound, 'addXbd': addXbound,
'x': useX, 'u': useU, 'p': useP}
if self.t0ind > 0:
rst['t0'] = guess[self.t0ind]
else:
rst['t0'] = self.t0
if self.tfind > 0:
rst['tf'] = guess[self.tfind]
else:
rst['tf'] = self.tf
rst['t'] = np.linspace(rst['t0'], rst['tf'], 2*N - 1)
# parse addx
if self.lenAddX > 0:
addx = self.__parseAddX__(guess)
for i, addx_ in enumerate(addx):
rst['addx_%d' % i] = addx_
return rst
def __parseAddX__(self, x):
numTraj = self.numTraj
addX = []
for addx in self.addX:
addX.append(x[numTraj: numTraj + addx.n])
numTraj += addx.n
return addX
def __parseObj__(self, x):
return x[self.numSol - self.objaddn:]
[docs] def getAddXIndexByIndex(self, i):
"""With i as index of addx, it returns the starting index in solution vector for this one.
:param i: int, the index of addX we want to query.
"""
index = self.numTraj
for j in range(i):
index += self.addX[j].n
return index
[docs] def getStateIndexByIndex(self, i):
"""With i as index for state variable, return the starting index in solution vector.
:param i: int, the index of State we want to query
"""
if i >= 0:
return self.dimpoint * i
else:
return (self.nPoint + i) * self.dimpoint
[docs] def getContrlIndexByIndex(self, i):
"""With i as index for control variable, return the starting index in solution vector.
:param i: int, the index of control we want to query
"""
if i >= 0:
return self.dimpoint * i + self.dimx
else:
return (self.nPoint + i) * self.dimpoint + self.dimx
[docs] def getParamIndexByIndex(self, i):
"""With i as index for parameter variable, return the starting index in solution vector.
:param i: int, the index of parameter we want to query
"""
if i >= 0:
return self.dimpoint * i + self.dimx + self.dimu
else:
return (self.nPoint + i) * self.dimpoint + self.dimx + self.dimu
[docs] def __callg__(self, x, y, G, row, col, rec, needg):
"""Evaluate those constraints, objective functions, and constraints. It simultaneously allocates sparsity matrix.
:param x: ndarray, the solution to the problem
:param y: ndarray, return F
:param G/row/col: ndarray, information of gradient
:param rec/needg: if we record/ if we need gradient
"""
y[0] = 0 # since this row is purely linear
h, useT = self.__get_time_grid__(x)
useX, useU, useP = self.__parseX__(x)
# loop over all system dynamics constraint
curRow = 1
curNg = 0
curRow, curNg = self.__dynconstr_mode_g__(curRow, curNg, h, useT, useX, useU, useP, y, G, row, col, rec, needg)
curRow, curNg = self.__constr_mode_g__(curRow, curNg, h, useT, useX, useU, useP, x, y, G, row, col, rec, needg)
curRow += self.numLinCon
# loop over all the objective functions, I haven't checked if order is correct since some linear constraints are followed
curRow, curNg = self.__obj_mode_g__(curRow, curNg, h, useT, useX, useU, useP, x, y, G, row, col, rec, needg)
return curRow, curNg
def __dynconstr_mode_g__(self, curRow, curNg, h, useT, useX, useU, useP, y, G, row, col, rec, needg):
"""Evaluate the constraints imposed by system dynamics"""
dimx, dimu, dimp = self.dimx, self.dimu, self.dimp
dimpoint = self.dimpoint
dimdyn = self.dimdyn # this works for many cases
nPoint = self.nPoint
# first let's check the 2*N - 1 dimdyn constraints from dynamics
cDyn = np.reshape(y[curRow:curRow + nPoint * dimdyn], (nPoint, dimdyn))
for i in range(nPoint):
if self.sys.autonomous:
Gpiece = G[curNg: curNg + self.sys.nG]
rowpiece = row[curNg: curNg + self.sys.nG]
colpiece = col[curNg: curNg + self.sys.nG]
self.sys.dyn(useT[i], useX[i], useU[i], useP[i], cDyn[i], Gpiece, rowpiece, colpiece, rec, needg)
if needg:
curNg += self.sys.nG
if rec:
rowpiece[:] += curRow
colpiece[:] = self.__patchCol__(i, colpiece[:])
else:
self.sys.dyn(useT[i], useX[i], useU[i], useP[i], cDyn[i], self.G, self.row, self.col, rec, needg)
curNg = self.__copy_into_g__(i, G, row, col, curRow, curNg, self.sys.nG, self.sys.timeindex, False, rec,
self.G, self.row, self.col)
curRow += self.dimdyn
# offset of row number due to defect dynamics constraint
if self.fixTimeMode:
curRow += self.numDefectDyn
else:
# manually set those defect dynamics and gradients, etc
defectRow = (self.N - 1) * 2 * self.daeOrder * dimdyn
cDefect = np.reshape(y[curRow: curRow + defectRow], (self.N - 1, self.daeOrder, 2, dimdyn))
bscIndex = np.arange(dimdyn)
for i in range(self.N - 1):
lefti, righti = 2 * i, 2 * i + 2
for j in range(self.daeOrder):
cDefect[i, j, 0, :] = h/8 * useX[lefti, (j+1)*dimdyn:(j+2)*dimdyn] - h/8 * useX[righti, (j+1)*dimdyn:(j+2)*dimdyn]
cDefect[i, j, 1, :] = -1.5/h * useX[lefti, j*dimdyn:(j+1)*dimdyn] + 1.5/h * useX[righti, j*dimdyn:(j+1)*dimdyn]
if needg:
G[curNg: curNg + dimdyn] = h / 8
G[curNg + dimdyn: curNg + 2*dimdyn] = -h / 8
G[curNg + 2*dimdyn: curNg + 3*dimdyn] = -1.5 / h
G[curNg + 3*dimdyn: curNg + 4*dimdyn] = 1.5 / h
if rec:
row[curNg:curNg + dimdyn] = curRow + bscIndex
row[curNg + dimdyn:curNg + 2*dimdyn] = curRow + bscIndex
col[curNg:curNg + dimdyn] = lefti * dimpoint + j * dimdyn + dimdyn + bscIndex
col[curNg + dimdyn:curNg + 2*dimdyn] = righti * dimpoint + j * dimdyn + dimdyn + bscIndex
row[curNg + 2*dimdyn:curNg + 3*dimdyn] = curRow + bscIndex + dimdyn
row[curNg + 3*dimdyn:curNg + 4*dimdyn] = curRow + bscIndex + dimdyn
col[curNg + 2*dimdyn:curNg + 3*dimdyn] = lefti * dimpoint + j * dimdyn + bscIndex
col[curNg + 3*dimdyn:curNg + 4*dimdyn] = righti * dimpoint + j * dimdyn + bscIndex
curNg += 4 * dimdyn
# if time related, we have to also consider them
if not self.fixTimeMode:
pc1pt = (useX[lefti, (j+1)*dimdyn:(j+2)*dimdyn] - useX[righti, (j+1)*dimdyn:(j+2)*dimdyn]) / 8
pc2pt = -(-useX[lefti, j*dimdyn:(j+1)*dimdyn] + useX[righti, j*dimdyn:(j+1)*dimdyn]) * 1.5 / h ** 2
if self.t0ind > 0:
G[curNg: curNg + dimdyn] = -pc1pt / (self.N - 1)
G[curNg + dimdyn: curNg + 2*dimdyn] = -pc2pt / (self.N - 1)
if rec:
row[curNg: curNg + dimdyn] = curRow + bscIndex
row[curNg + dimdyn: curNg + 2*dimdyn] = curRow + bscIndex + dimdyn
col[curNg: curNg + 2*dimdyn] = self.t0ind
curNg += 2 * dimdyn
if self.tfind > 0:
G[curNg: curNg + dimdyn] = pc1pt / (self.N - 1)
G[curNg + dimdyn: curNg + 2*dimdyn] = pc2pt / (self.N - 1)
if rec:
row[curNg: curNg + dimdyn] = curRow + bscIndex
row[curNg + dimdyn: curNg + 2*dimdyn] = curRow + bscIndex + dimdyn
col[curNg: curNg + 2*dimdyn] = self.tfind
curNg += 2*dimdyn
curRow += 2 * dimdyn
if self.defectU:
curRow += (self.N - 1) * self.dimu
if self.defectP:
curRow += (self.N - 1) * self.dimp
return curRow, curNg
def __copy_into_g__(self, index, G, row, col, curRow, curNg, nG, time_index, plus, rec,
G_src, row_src, col_src, col_offset=0):
"""With sparsity calculated in self.G, we assign to correct G.
:param index: int, we are evaluating this at which point
:param G, row, col: the G, row, col vector storing sparse Jacobian.
:param curRow: accumulated row number.
:param curNg: accumulated sparse Jacobian number
:param nG: number of non-zero of Jacobian, this indicates how long of self.G we shall use
:param timeindex: index indicating time related.
:param plus: bool, if we plus value to time-related index (integral one)
:param rec: bool, if we record index into row and col
:param G_src/row_src/col_src: where we copy value from
:param col_offset: int, offset of column, it is only used for multiple-phase problem
:return curNg: updated occupied Ng
"""
# use time index to build the mask for selecting data
G_ = G_src[:nG]
timemask = np.zeros(nG, dtype=bool)
timemask[time_index] = True # get a mask for time-related gradient
statemask = np.logical_not(timemask)
lenstate = nG - len(time_index)
lenTime = nG - lenstate
G[curNg: curNg + lenstate] = G_[statemask]
if rec:
col_ = col_src[:nG]
row_ = row_src[:nG]
col[curNg: curNg + lenstate] = col_[statemask] - 1 + index * self.dimpoint + col_offset
row[curNg: curNg + lenstate] = row_[statemask] + curRow
curNg += lenstate
# for time related columns
if self.t0ind > 0:
ptpt0 = (self.N - 1 - index) / (self.N - 1)
if plus:
G[curNg: curNg + lenTime] += G_[time_index] * ptpt0
else:
G[curNg: curNg + lenTime] = G_[time_index] * ptpt0
if rec:
row[curNg: curNg + lenTime] = row_[time_index] + curRow
col[curNg: curNg + lenTime] = self.t0ind + col_offset
curNg += lenTime
if self.tfind > 0:
ptptf = index / (self.N - 1)
if plus:
G[curNg: curNg + lenTime] += G_[time_index] * ptptf
else:
G[curNg: curNg + lenTime] = G_[time_index] * ptptf
if rec:
row[curNg: curNg + lenTime] = row_[time_index] + curRow
col[curNg: curNg + lenTime] = self.tfind + col_offset
curNg += lenTime
return curNg
def __obj_mode_g__(self, curRow, curNg, h, useT, useX, useU, useP, x, y, G, row, col, rec, needg):
"""Calculate objective function. Basically it moves all nonlinear objective function to the final rows
See __constrModeG__ for arguments and output."""
tmpout = np.zeros(1)
# first lets do lqrobj
if self.lqrObj is not None: # the lqr obj
Gpiece = G[curNg: curNg + self.LQRnG]
rowpiece = row[curNg: curNg + self.LQRnG]
colpiece = col[curNg: curNg + self.LQRnG]
self.lqrObj(h, useX, useU, useP, tmpout, Gpiece, rowpiece, colpiece, rec, needg)
y[curRow] = tmpout[0]
if rec:
rowpiece[:] = curRow
curNg += self.LQRnG
curRow += 1
# still in the point, path, obj order
if len(self.nonPointObj) > 0:
for obj in self.nonPointObj:
tmpx = np.concatenate(([useT[obj.index]], useX[obj.index], useU[obj.index], useP[obj.index]))
if obj.autonomous:
Gpiece = G[curNg: curNg + obj.nG]
rowpiece = row[curNg: curNg + obj.nG]
colpiece = col[curNg: curNg + obj.nG]
obj.__callg__(tmpx, tmpout, Gpiece, rowpiece, colpiece, rec, needg)
if rec:
rowpiece[:] = curRow
colpiece[:] = self.__patchCol__(obj.index, colpiece)
curNg += obj.nG
else:
obj.__callg__(tmpx, tmpout, self.G, self.row, self.col, rec, needg)
curNg = self.__copy_into_g__(obj.index, G, row, col, curRow, curNg, obj.nG, obj.timeindex, False, rec, self.G, self.row, self.col)
y[curRow] = tmpout[0]
curRow += 1
if len(self.nonPathObj) > 0:
weight = np.zeros((self.nPoint, 1))
weight[1::2] = 2.0 / 3.0 * h
weight[0::2] = 1.0 / 3.0 * h
weight[0] = 1.0 / 6.0 * h
weight[-1] = 1.0 / 6.0 * h
for obj in self.nonPathObj:
y[curRow] = 0
if obj.autonomous:
for i in range(self.nPoint):
tmpx = np.concatenate(([useT[i]], useX[i], useU[i], useP[i]))
Gpiece = G[curNg: curNg + obj.nG]
rowpiece = row[curNg: curNg + obj.nG]
colpiece = col[curNg: curNg + obj.nG]
obj.__callg__(tmpx, tmpout, Gpiece, rowpiece, colpiece, rec, needg)
Gpiece[:] *= weight[i]
if rec:
rowpiece[:] = curRow
colpiece[:] = self.__patchCol__(i, colpiece)
curNg += obj.nG
else:
for i in range(self.nPoint):
tmpx = np.concatenate(([useT[i]], useX[i], useU[i], useP[i]))
obj.__callg__(tmpx, tmpout, self.G, self.row, self.col, rec, needg)
self.G[:obj.nG] *= weight[i]
curNg = self.__copy_into_g__(i, G, row, col. curRow, curNg, obj.nG, obj.timeindex, True, rec, self.G, self.row, self.col)
y[curRow] += weight[i] * tmpout[0]
curRow += 1
if(len(self.nonLinObj)) > 0:
for obj in self.nonLinObj: # nonlinear cost function
Gpiece = G[curNg: curNg + obj.nG]
rowpiece = row[curNg: curNg + obj.nG]
colpiece = col[curNg: curNg + obj.nG]
obj.__callg__(x, tmpout, Gpiece, rowpiece, colpiece, rec, needg)
y[curRow] = tmpout[0]
if rec:
rowpiece[:] = curRow
curNg += obj.nG
curRow += 1
return curRow, curNg
def __constr_mode_g__(self, curRow, curNg, h, useT, useX, useU, useP, x, y, G, row, col, rec, needg):
"""Calculate constraint function. G mode
:param curRow: int, index from which we write on
:param curNg: int, current index in G
:param h, useT, useX, useU, useP: parsed solution
:param y: ndarray, the F to be written
:param G, row, col: ndarray, the G to be written and the locations
:param rec: bool, if we record row and col
:param needg: bool, if we need gradient information
:returns: curRow: current row after we write on y
:returns: curNg: current index in G after this
"""
# loop over other constraints
if len(self.pointConstr) > 0:
for constr in self.pointConstr:
tmpx = np.concatenate(([useT[constr.index]], useX[constr.index], useU[constr.index], useP[constr.index]))
if constr.autonomous:
pieceG = G[curNg: curNg + constr.nG]
pieceRow = row[curNg: curNg + constr.nG]
pieceCol = col[curNg: curNg + constr.nG]
constr.__callg__(tmpx, y[curRow: curRow + constr.nf], pieceG, pieceRow, pieceCol, rec, needg)
if rec:
pieceRow += curRow
pieceCol[:] = self.__patchCol__(constr.index, pieceCol)
curNg += constr.nG
else:
constr.__callg__(tmpx, y[curRow: curRow + constr.nf], self.G, self.row, self.col, rec, needg)
curNg = self.__copy_into_g__(constr.index, G, row, col, curRow, curNg, constr.nG, constr.timeindex,
True, rec, self.G, self.row, self.col)
curRow += constr.nf
if len(self.pathConstr) > 0:
for constr, (start, end) in zip(self.pathConstr, self.pathConstrIndexPairs):
if constr.autonomous:
for j in range(2*start, 2*end - 1):
if not self.colloc_constr_is_on:
if j % 2 == 1:
continue
i = j
tmpx = np.concatenate(([useT[i]], useX[i], useU[i], useP[i]))
pieceG = G[curNg: curNg + constr.nG]
pieceRow = row[curNg: curNg + constr.nG]
pieceCol = col[curNg: curNg + constr.nG]
constr.__callg__(tmpx, y[curRow: curRow + constr.nf], pieceG, pieceRow, pieceCol, rec, needg)
if rec:
pieceRow += curRow
pieceCol[:] = self.__patchCol__(i, pieceCol)
curRow += constr.nf
curNg += constr.nG
else:
for j in range(2*start, 2*end - 1):
if not self.colloc_constr_is_on:
if j % 2 == 1:
continue
i = j
tmpx = np.concatenate(([useT[i]], useX[i], useU[i], useP[i]))
constr.__callg__(tmpx, y[curRow: curRow + constr.nf], self.G, self.row, self.col, rec, needg)
curNg = self.__copy_into_g__(i, G, row, col, curRow, curNg, constr.nG, constr.timeindex, True, rec,
self.G, self.row, self.col)
curRow += constr.nf
if len(self.nonLinConstr) > 0:
for constr in self.nonLinConstr:
pieceG = G[curNg: curNg + constr.nG]
pieceRow = row[curNg: curNg + constr.nG]
pieceCol = col[curNg: curNg + constr.nG]
constr.__callg__(x, y[curRow: curRow + constr.nf], pieceG, pieceRow, pieceCol, rec, needg)
if rec:
pieceRow += curRow
curRow += constr.nf
curNg += constr.nG
return curRow, curNg
# interface functions for ipopt
[docs] def ipEvalF(self, x):
"""The eval_f function required by ipopt.
:param x: a guess/solution of the problem
:return f: float, objective function
"""
row0 = self.spA.getrow(0)
return np.dot(row0.data, x[row0.indices])
[docs] def ipEvalGradF(self, x):
"""Evaluation of the gradient of objective function.
:param x: guess/solution to the problem
:return grad: gradient of objective function w.r.t x
"""
return self.spA.getrow(0).toarray().flatten()
[docs] def ipEvalG(self, x):
"""Evaluation of the constraint function.
:param x: ndarray, guess/solution to the problem.
:return g: constraint function
"""
y = np.zeros(self.numF)
G = np.zeros(1)
row = np.zeros(1, dtype=int)
col = np.zeros(1, dtype=int)
self.__callg__(x, y, G, row, col, False, False)
# y should plus A times x
y += self.spA.dot(x)
return y
[docs] def ipEvalJacG(self, x, flag):
"""Evaluate jacobian of constraints. I simply call __callg__
:param x: ndarray, guess / solution to the problem
:param flag: bool, True return row/col, False return values
"""
y = np.zeros(self.numF)
G = np.zeros(self.nG + self.spA.nnz)
if flag:
row = np.ones(self.nG + self.spA.nnz, dtype=int)
col = np.ones(self.nG + self.spA.nnz, dtype=int)
tmpx = self.randomGenX()
self.__callg__(tmpx, y, G, row, col, True, True)
# good news is there is no overlap of A and G
row[self.nG:] = self.spA_coo.row
col[self.nG:] = self.spA_coo.col
return row, col
else:
row = np.ones(1, dtype=int)
col = np.ones(1, dtype=int)
self.__callg__(x, y, G, row, col, False, True)
G[self.nG:] = self.spA_coo.data
return G
def __patchCol__(self, index, col, col_offset=0):
"""Find which indices it belongs to the original one for a local matrix at index with col.
Since we have changed how variables are arranged, now it should be quite straightforward to do so.
"""
col = col[col > 0] # get rid of those with time
return col - 1 + self.getStateIndexByIndex(index) + col_offset
[docs] def parse_sol(self, sol):
"""Call parseX function from utility and return a dict of solution."""
X, U, P = self.__parseX__(sol)
if self.dimp == 0:
P = None
h, tgrid = self.__get_time_grid__(sol)
obj = self.__parseObj__(sol)
if self.lenAddX == 0:
return {'t': tgrid, 'x': X, 'u': U, 'p': P, 'obj': obj}
else:
return {'t': tgrid, 'x': X, 'u': U, 'p': P, 'addx': self.__parseAddX__(sol), 'obj': obj}
[docs] def addLQRObj(self, lqrobj):
"""Add a lqr objective function to the problem. It changes lqrObj into a function being called.
:param lqrobj: a lqrObj class.
"""
Fcol = lqrobj.F.col
Qcol = lqrobj.Q.col
Rcol = lqrobj.R.col
useF = len(Fcol)
useQ = len(Qcol)
useR = len(Rcol)
if useF > 0 and useQ > 0:
assert np.allclose(Fcol, Qcol)
if lqrobj.P is not None:
Pcol = lqrobj.P.col
useP = len(Pcol)
else:
useP = 0
Pcol = []
if lqrobj.tfweight is not None:
tfweight = lqrobj.tfweight
else:
tfweight = 0.0
nPoint = self.nPoint
num1 = nPoint * (useQ + useR + useP)
self.LQRnG = num1 + self.numT
if useF > 0 and useQ == 0:
self.LQRnG += useF
weight = np.zeros((nPoint, 1))
weight[1::2] = 2.0 / 3.0
weight[0::2] = 1.0 / 3.0
weight[0] = 1.0 / 6.0
weight[-1] = 1.0 / 6.0
baseCol = self.dimpoint * np.arange(self.nPoint)[:, np.newaxis] # a nPoint by 1 column matrix
def __callg__(h, useX, useU, useP_, y, G, row, col, rec, needg):
"""Calculate the lqr cost with gradient information.
:param h: float, grid size
:param useX, useU, useP: ndarray, parsed X, U, P from x
:param y: ndarray, a location to write the objective function onto
:param G/row/col: the gradient information
:param rec: if we record row and col
:param needg: if we need gradient information.
"""
if rec:
row[:] = 0
col_ = np.reshape(col[:num1], (nPoint, -1))
yF = 0.0
yQ = 0.0
yR = 0.0
yP = 0.0
yTf = tfweight * (h * (self.N - 1))
curG = 0
if needg:
G_ = np.reshape(G[:num1], (nPoint, -1)) # use the same structure with col_
if useQ > 0:
yQ = np.dot(weight[:, 0], np.sum(((useX[:, Qcol] - lqrobj.xbase[Qcol]) ** 2) * lqrobj.Q.data, axis=1)) * h
if needg:
G_[:, :useQ] = 2.0 * h * (weight * ((useX[:, Qcol] - lqrobj.xbase[Qcol]) * lqrobj.Q.data))
if rec:
col_[:, :useQ] = Qcol + baseCol
curG += nPoint * useQ
if useR > 0:
yR = np.dot(weight[:, 0], np.sum(((useU[:, Rcol] - lqrobj.ubase[Rcol]) ** 2) * lqrobj.R.data, axis=1)) * h
if needg:
G_[:, useQ: useQ + useR] = (2.0 * h * (weight * ((useU[:, Rcol] - lqrobj.ubase[Rcol]) * lqrobj.R.data)))
if rec:
col_[:, useQ: useQ+useR] = Rcol + baseCol + self.dimx
curG += nPoint * useR
if useP > 0:
yP = np.dot(weight[:, 0], np.sum(((useP_[:, Pcol] - lqrobj.pbase[Pcol]) ** 2) * lqrobj.P.data, axis=1)) * h
if needg:
G_[:, useQ+useR: useQ+useR+useP] = (2.0 * h * (weight * ((useP_[:, Pcol] - lqrobj.pbase[Pcol]) * lqrobj.P.data)))
if rec:
col_[:, useQ+useR: useQ+useR+useP] = Pcol + baseCol + self.dimx + self.dimu
curG += nPoint * useP
if useF > 0:
yF = np.sum(lqrobj.F.data * ((useX[-1, Fcol] - lqrobj.xfbase[Fcol]) ** 2))
if useQ > 0:
if needg:
n0 = curG - numPoint * (useR + useP) - useF # locate at the last row
G[n0: n0 + useF] += 2.0 * lqrobj.F.data * (useX[-1, Fcol] - lqrobj.xfbase[Fcol])
else:
G[curG: curG + useF] = 2.0 * lqrobj.F.data * (useX[-1, Fcol] - lqrobj.xfbase[Fcol])
if rec:
col[curG: curG + useF] = Fcol + baseCol[-1]
curG += useF
if needg:
if self.t0ind > 0:
if h <= 1e-8:
pass
G[curG] = -(yQ + yR + yP) / h / (self.N - 1) - tfweight
if rec:
col[curG: curG+1] = self.t0ind
curG += 1
if self.tfind > 0:
G[curG] = (yQ + yR + yP) / h / (self.N - 1) + tfweight
if rec:
col[curG: curG+1] = self.tfind
curG += 1
y[0] = yF + yQ + yR + yP + yTf
self.lqrObj = __callg__
[docs] def addVanillaQuadPenalty(self, indices, weights):
"""Add a quadratic penalty term based on indices in the solution vector and weights.
This is dangerous and might cause unexpected trouble unless you are sure indices are correct.
You can always use addStateQuadPenalty/addControlQuadPenalty/addParamQuadPenalty/addAddXQuadPenalty to finish these.
:param indices: indices of variables to be penalized
:param weights: float/ndarray weights associated with those variables.
"""
self.addNonLinearObj(quadPenalty(indices, weights))
[docs] def addStateQuadPenalty(self, index, weights, mask=None):
"""Add a quadratic penalty on selected state variables.
:param index: int/array-like, indices of state variables to be penalized.
:param weights: float/array-like, weights of penalty
:param mask: mask-like, filter for selecting subset of variables
"""
stateindex = np.array(index)
if np.isscalar(weights):
useweight = weights
else:
useweight = []
index = []
for idx in stateindex:
i0 = self.getStateIndexByIndex(idx)
if mask is None:
index.append(np.arange(i0, i0 + self.dimx))
else:
index.append(np.arange(i0, i0 + self.dimx)[mask])
if not np.isscalar(weights):
useweight.append(weights)
indexes = np.concatenate(index)
if isinstance(weights, list):
useweight = np.concatenate(useweight)
self.addVanillaQuadPenalty(indexes, useweight)
[docs] def addControlQuadPenalty(self, index, weights, mask=None):
"""Add a quadratic penalty on selected control variables.
:param index: int/array-like, indices of control variables to be penalized.
:param weights: float/array-like, weights of penalty
:param mask: filter to select subset
"""
ctrlindex = np.array(index)
if np.isscalar(weights):
useweight = weights
else:
useweight = []
index = []
for idx in ctrlindex:
i0 = self.getControlIndexByIndex(idx)
if mask is None:
index.append(np.arange(i0, i0 + self.dimu))
else:
index.append(np.arange(i0, i0 + self.dimu)[mask])
if isinstance(weights, list):
useweight.append(weights)
indexes = np.concatenate(index)
if isinstance(weights, list):
useweight = np.concatenate(useweight)
self.addVanillaQuadPenalty(indexes, useweight)
[docs] def addParamQuadPenalty(self, index, weights, mask=None):
"""Add a quadratic penalty on selected parameter variables.
:param index: int/array-like, indices of parameter variables to be penalized.
:param weights: float/array-like, weights of penalty
:param mask: filter of variables
"""
paramindex = np.array(index)
if np.isscalar(weights):
useweight = weights
else:
useweight = []
index = []
for idx in ctrlindex:
i0 = self.getParamIndexByIndex(idx)
if mask is None:
index.append(np.arange(i0, i0 + self.dimp))
else:
index.append(np.arange(i0, i0 + self.dimp)[mask])
if isinstance(weights, list):
useweight.append(weights)
indexes = np.concatenate(index)
if isinstance(weights, list):
useweight = np.concatenate(useweight)
self.addVanillaQuadPenalty(indexes, useweight)
[docs] def addAddXQuadPenalty(self, index, weights, mask=None):
"""Add quadratic penalty to addx variables.
The API is slightly different from previous three since we cannot guarantee weights are of the same lenght.
:param index: int, indices of parameter variables to be penalized.
:param weights: float/array-like, weights of penalty
:param mask: filter
"""
assert isinstance(index, int)
i0 = self.getAddXIndexByIndex(index)
naddx = self.addX[index].n
if mask is None:
indexes = np.arange(i0, i0 + naddx)
else:
indexes = np.arange(i0, i0 + naddx)[mask]
self.addVanillaQuadPenalty(indexes, weights)
[docs] def addLinearObj(self, linObj):
"""Add linear objective function.
:param linObj: linearObj class
"""
assert isinstance(linObj, linearObj)
self.linearObj.append(linObj)
[docs] def addLinearPointObj(self, linPointObj, path=False):
"""Add linear point objective function.
:param linPointObj: linearPointObj class
:param path: bool, if this is path obj (at every point except for final one)
"""
assert isinstance(linPointObj, linearPointObj)
if path:
self.linPathObj.append(linPointObj)
else:
self.linPointObj.append(linPointObj)
[docs] def addNonLinearObj(self, nonlinObj):
"""Add nonlinear objective function.
:param nonLinObj: a nonLinObj class
"""
assert isinstance(nonlinObj, nonLinearObj)
self.nonLinObj.append(nonlinObj)
[docs] def addNonLinearPointObj(self, nonPntObj, path=False):
"""Add nonlinear point objective.
:param nonPntObj: nonLinObj class
:param path: bool, if this obj is pointwise
"""
assert isinstance(nonPntObj, nonLinearPointObj)
if path:
self.nonPathObj.append(nonPntObj)
else:
self.nonPointObj.append(nonPntObj)
[docs] def addNonLinearPointConstr(self, pntConstr, path=False, **kwargs):
"""Add point constraint.
:param pntConstr: pointConstr class
:param path: bool, if this obj
:kwargs: additional parameters, users can specify starting and ending indexes by specifying start and end
"""
assert isinstance(pntConstr, nonLinearPointConstr)
if path:
start = kwargs.get('start', 0)
end = kwargs.get('end', self.N)
if end <= 0:
end = self.N + end
self.pathConstr.append(pntConstr)
self.pathConstrIndexPairs.append((start, end))
else:
self.pointConstr.append(pntConstr)
[docs] def addNonLinearConstr(self, constr):
"""Add a general nonlinear constraint.
:param constr: nonLinConstr class
"""
assert isinstance(constr, nonLinearConstr)
self.nonLinConstr.append(constr)
[docs] def addLinearConstr(self, constr):
"""Add a linear constraint to the problem.
:param constr: a linearConstr object
"""
assert isinstance(constr, linearConstr)
self.linearConstr.append(constr)
[docs] def addLinearPointConstr(self, constr, path=False):
"""Add a linear point constraint to the problem.
:param constr: a linearPointConstr object
:param path: if this constraint is path constraint
"""
assert isinstance(constr, linearPointConstr)
if path:
self.linPathConstr.append(constr)
else:
self.linPointConstr.append(constr)
[docs] def addObj(self, obj, path=False):
"""A high level function that add objective function of any kind.
:param obj: an objective object.
:param path: bool, if the point objective is an integral one.
"""
if isinstance(obj, linearObj):
self.addLinearObj(obj)
elif isinstance(obj, linearPointObj):
self.addLinearPointObj(obj, path)
elif isinstance(obj, nonLinearObj):
self.addNonLinearObj(obj)
elif isinstance(obj, nonLinearPointObj):
self.addNonLinearPointObj(obj, path)
elif isinstance(obj, lqrObj):
self.addLQRObj(obj)
[docs] def addConstr(self, constr, path=False, **kwargs):
"""Add a constraint to the problem.
:param constr: a constraint object.
:param path: bool, if this constraint is a path constraint. Only applies for point constraint.
"""
if isinstance(constr, linearConstr):
self.addLinearConstr(constr)
elif isinstance(constr, linearPointConstr):
self.addLinearPointConstr(constr, path)
elif isinstance(constr, nonLinearConstr):
self.addNonLinearConstr(constr)
elif isinstance(constr, nonLinearPointConstr):
self.addNonLinearPointConstr(constr, path, **kwargs)
else:
raise NotImplementedError
[docs] def setN(self, N):
"""Set N.
:param N: the size of discretization.
"""
self.N = N
[docs] def sett0tf(self, t0, tf):
"""Set t0 and tf.
:param t0: float/ndarray (2,) allowable t0
:param tf: float/ndarray (2,) allowable tf
"""
self.t0 = t0
self.tf = tf
if np.isscalar(tf):
self.fixtf = True
else:
self.fixtf = False
assert tf[0] <= tf[1]
if np.isscalar(t0):
self.fixt0 = True
else:
self.fixt0 = False
assert t0[0] <= t0[1]
[docs] def setXbound(self, xlb, xub):
"""Set bounds on state variables.
:param xlb: ndarray, (dimx,) lower bounds on state variables.
:param xub: ndarray, (dimx,) upper bounds on state variables.
"""
if len(xlb) != self.dimx:
print('Incorrect length of xlb, it must be %d' % self.dimx)
if len(xub) != self.dimx:
print('Incorrect length of xub, it must be %d' % self.dimx)
self.xbd = [np.array(xlb), np.array(xub)]
[docs] def setUbound(self, ulb, uub):
"""Set bounds on control variables.
:param ulb: ndarray, (dimu,) lower bounds on control variables.
:param uub: ndarray, (dimu,) upper bounds on control variables.
"""
if len(ulb) != self.dimu:
print('Incorrect length of ulb, it must be %d' % self.dimu)
if len(uub) != self.dimu:
print('Incorrect length of uub, it must be %d' % self.dimu)
self.ubd = [np.array(ulb), np.array(uub)]
[docs] def setPbound(self, plb, pub):
"""Set bounds on parameter variables.
:param plb: ndarray, (dimp,) lower bounds on parameter variables.
:param pub: ndarray, (dimp,) upper bounds on parameter variables.
"""
if len(plb) != self.dimp:
print('Incorrect length of plb, it must be %d' % self.dimp)
if len(pub) != self.dimp:
print('Incorrect length of pub, it must be %d' % self.dimp)
self.pbd = [np.array(plb), np.array(pub)]
[docs] def setX0bound(self, x0lb, x0ub):
"""Set bounds on x0. This is optional but useful.
:param x0lb: ndarray, (dimx,) lower bounds on x0 variables.
:param x0ub: ndarray, (dimx,) upper bounds on x0 variables.
"""
if len(x0lb) != self.dimx:
print('Incorrect length of x0lb, it must be %d' % self.dimx)
if len(x0ub) != self.dimx:
print('Incorrect length of x0ub, it must be %d' % self.dimx)
self.x0bd = [np.array(x0lb), np.array(x0ub)]
[docs] def setXfbound(self, xflb, xfub):
"""Set bounds on xf. This is optional but useful.
:param xflb: ndarray, (dimx,) lower bounds on xf variables.
:param xfub: ndarray, (dimx,) upper bounds on xf variables.
"""
if len(xflb) != self.dimx:
print('Incorrect length of xflb, it must be %d' % self.dimx)
if len(xfub) != self.dimx:
print('Incorrect length of xfub, it must be %d' % self.dimx)
self.xfbd = [np.array(xflb), np.array(xfub)]