testGymCartpoleEnv.py

You can view and download this file on Github: testGymCartpoleEnv.py

  1#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
  2# This is an EXUDYN example
  3#
  4# Details:  This file serves as an input to testGymCartpole.py
  5#
  6# Author:   Johannes Gerstmayr, Grzegorz Orzechowski
  7# Date:     2022-05-17
  8# Update:   2023-05-20: derive from gym.Env to ensure compatibility with newer stable-baselines3
  9#
 10# Copyright:This file is part of Exudyn. Exudyn is free software. You can redistribute it and/or modify it under the terms of the Exudyn license. See 'LICENSE.txt' for more details.
 11#
 12#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 13
 14import exudyn as exu
 15from exudyn.utilities import *
 16import math
 17
 18import math
 19from typing import Optional, Union
 20
 21import numpy as np
 22
 23import gym
 24from gym import logger, spaces, Env
 25from gym.error import DependencyNotInstalled
 26
 27
 28class CartPoleEnv(Env):
 29
 30    #metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 50}
 31    metadata = {"render_modes": ["human"], "render_fps": 50}
 32
 33    def __init__(self, thresholdFactor = 1., forceFactor = 1.):
 34
 35        self.SC = exu.SystemContainer()
 36        self.mbs = self.SC.AddSystem()
 37
 38
 39        #%%+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 40        #+++++++++++++++++++++++++++++++++++++++++++++++++++++
 41        #take variables from cartpole example in openAIgym
 42        self.gravity = 9.8
 43        self.masscart = 1.0
 44        self.masspole = 0.1
 45        self.total_mass = self.masspole + self.masscart
 46        self.lengthHalf = 0.5  # actually half the pole's self.length
 47        self.length = self.lengthHalf*2
 48        self.polemass_length = self.masspole * self.length
 49        self.force_mag = 10.0*forceFactor
 50        self.stepUpdateTime = 0.02  # seconds between state updates
 51        self.kinematics_integrator = "euler"
 52
 53        # Angle at which to fail the episode
 54        self.theta_threshold_radians = thresholdFactor*12 * 2 * math.pi / 360
 55        self.x_threshold = thresholdFactor*2.4
 56
 57        high = np.array(
 58            [
 59                self.x_threshold * 2,
 60                np.finfo(np.float32).max,
 61                self.theta_threshold_radians * 2,
 62                np.finfo(np.float32).max,
 63            ],
 64            dtype=np.float32,
 65        )
 66
 67        #+++++++++++++++++++++++++++++++++++++++++++++++++++++
 68        #see https://github.com/openai/gym/blob/64b4b31d8245f6972b3d37270faf69b74908a67d/gym/core.py#L16
 69        #for Env:
 70        self.action_space = spaces.Discrete(2)
 71        self.observation_space = spaces.Box(-high, high, dtype=np.float32)
 72        #+++++++++++++++++++++++++++++++++++++++++++++++++++++
 73        self.state = None
 74        self.rendererRunning=None
 75        self.useRenderer = False #turn this on if needed
 76
 77        background = GraphicsDataCheckerBoard(point= [0,0,0], normal= [0,0,1], size=4)
 78
 79        oGround=self.mbs.AddObject(ObjectGround(referencePosition= [0,0,0],  #x-pos,y-pos,angle
 80                                           visualization=VObjectGround(graphicsData= [background])))
 81        nGround=self.mbs.AddNode(NodePointGround())
 82
 83        gCart = GraphicsDataOrthoCubePoint(size=[0.5*self.length, 0.1*self.length, 0.1*self.length],
 84                                           color=color4dodgerblue)
 85        self.nCart = self.mbs.AddNode(Rigid2D(referenceCoordinates=[0,0,0]));
 86        oCart = self.mbs.AddObject(RigidBody2D(physicsMass=self.masscart,
 87                                          physicsInertia=0.1*self.masscart, #not needed
 88                                          nodeNumber=self.nCart,
 89                                          visualization=VObjectRigidBody2D(graphicsData= [gCart])))
 90
 91        gPole = GraphicsDataOrthoCubePoint(size=[0.1*self.length, self.length, 0.1*self.length], color=color4red)
 92        self.nPole = self.mbs.AddNode(Rigid2D(referenceCoordinates=[0,0.5*self.length,0]));
 93        oPole = self.mbs.AddObject(RigidBody2D(physicsMass=self.masspole,
 94                                          physicsInertia=1e-6, #not included in original paper
 95                                          nodeNumber=self.nPole,
 96                                          visualization=VObjectRigidBody2D(graphicsData= [gPole])))
 97
 98        mCartCOM = self.mbs.AddMarker(MarkerNodePosition(nodeNumber=self.nCart))
 99        mPoleCOM = self.mbs.AddMarker(MarkerNodePosition(nodeNumber=self.nPole))
100        mPoleJoint = self.mbs.AddMarker(MarkerBodyPosition(bodyNumber=oPole, localPosition=[0,-0.5*self.length,0]))
101
102        mCartCoordX = self.mbs.AddMarker(MarkerNodeCoordinate(nodeNumber=self.nCart, coordinate=0))
103        mCartCoordY = self.mbs.AddMarker(MarkerNodeCoordinate(nodeNumber=self.nCart, coordinate=1))
104        mGroundNode = self.mbs.AddMarker(MarkerNodeCoordinate(nodeNumber=nGround, coordinate=0))
105
106        #gravity
107        self.mbs.AddLoad(Force(markerNumber=mCartCOM, loadVector=[0,-self.masscart*self.gravity,0]))
108        self.mbs.AddLoad(Force(markerNumber=mPoleCOM, loadVector=[0,-self.masspole*self.gravity,0]))
109
110        #control force
111        self.lControl = self.mbs.AddLoad(LoadCoordinate(markerNumber=mCartCoordX, load=1.))
112
113        #constraints:
114        self.mbs.AddObject(RevoluteJoint2D(markerNumbers=[mCartCOM, mPoleJoint]))
115        self.mbs.AddObject(CoordinateConstraint(markerNumbers=[mCartCoordY, mGroundNode]))
116
117
118
119
120        #%%++++++++++++++++++++++++
121        self.mbs.Assemble() #computes initial vector
122
123        self.simulationSettings = exu.SimulationSettings() #takes currently set values or default values
124
125
126        self.simulationSettings.timeIntegration.numberOfSteps = 1
127        self.simulationSettings.timeIntegration.endTime = 0 #will be overwritten in step
128        self.simulationSettings.timeIntegration.verboseMode = 0
129        self.simulationSettings.solutionSettings.writeSolutionToFile = False
130        #self.simulationSettings.timeIntegration.simulateInRealtime = True
131
132        self.simulationSettings.timeIntegration.newton.useModifiedNewton = True
133
134        self.SC.visualizationSettings.general.drawWorldBasis=True
135        self.SC.visualizationSettings.general.graphicsUpdateInterval = 0.01 #50Hz
136
137        self.simulationSettings.solutionSettings.solutionInformation = "Open AI gym"
138
139        self.dynamicSolver = exudyn.MainSolverImplicitSecondOrder()
140        self.dynamicSolver.InitializeSolver(self.mbs, self.simulationSettings)
141        self.dynamicSolver.SolveSteps(self.mbs, self.simulationSettings) #to initialize all data
142
143
144    def integrateStep(self, force):
145        #exudyn simulation part
146        #index 2 solver
147        self.mbs.SetLoadParameter(self.lControl, 'load', force)
148
149        #progress integration time
150        currentTime = self.simulationSettings.timeIntegration.endTime
151        self.simulationSettings.timeIntegration.startTime = currentTime
152        self.simulationSettings.timeIntegration.endTime = currentTime+self.stepUpdateTime
153
154        # exu.SolveDynamic(self.mbs, self.simulationSettings, solverType=exu.DynamicSolverType.TrapezoidalIndex2,
155        #                  updateInitialValues=True) #use final value as new initial values
156
157        self.dynamicSolver.InitializeSolverInitialConditions(self.mbs, self.simulationSettings)
158        self.dynamicSolver.SolveSteps(self.mbs, self.simulationSettings)
159        currentState = self.mbs.systemData.GetSystemState() #get current values
160        self.mbs.systemData.SetSystemState(systemStateList=currentState,
161                                        configuration = exu.ConfigurationType.Initial)
162        self.mbs.systemData.SetODE2Coordinates_tt(coordinates = self.mbs.systemData.GetODE2Coordinates_tt(),
163                                                configuration = exu.ConfigurationType.Initial)
164
165
166
167
168    def step(self, action):
169        err_msg = f"{action!r} ({type(action)}) invalid"
170        assert self.action_space.contains(action), err_msg
171        assert self.state is not None, "Call reset before using step method."
172        x, x_dot, theta, theta_dot = self.state
173
174        force = self.force_mag if action == 1 else -self.force_mag
175
176        #++++++++++++++++++++++++++++++++++++++++++++++++++
177        #++++++++++++++++++++++++++++++++++++++++++++++++++
178        self.integrateStep(force)
179        #+++++++++++++++++++++++++
180        #compute some output:
181        cartPosX = self.mbs.GetNodeOutput(self.nCart, variableType=exu.OutputVariableType.Coordinates)[0]
182        poleAngle = self.mbs.GetNodeOutput(self.nPole, variableType=exu.OutputVariableType.Coordinates)[2]
183        cartPosX_t = self.mbs.GetNodeOutput(self.nCart, variableType=exu.OutputVariableType.Coordinates_t)[0]
184        poleAngle_t = self.mbs.GetNodeOutput(self.nPole, variableType=exu.OutputVariableType.Coordinates_t)[2]
185
186        #finally write updated state:
187        self.state = (cartPosX, cartPosX_t, poleAngle, poleAngle_t)
188        #++++++++++++++++++++++++++++++++++++++++++++++++++
189        #++++++++++++++++++++++++++++++++++++++++++++++++++
190
191        done = bool(
192            cartPosX < -self.x_threshold
193            or cartPosX > self.x_threshold
194            or poleAngle < -self.theta_threshold_radians
195            or poleAngle > self.theta_threshold_radians
196        )
197
198        if not done:
199            reward = 1.0
200        elif self.steps_beyond_done is None:
201            # Pole just fell!
202            self.steps_beyond_done = 0
203            reward = 1.0
204        else:
205            if self.steps_beyond_done == 0:
206                logger.warn(
207                    "You are calling 'step()' even though this "
208                    "environment has already returned done = True. You "
209                    "should always call 'reset()' once you receive 'done = "
210                    "True' -- any further steps are undefined behavior."
211                )
212            self.steps_beyond_done += 1
213            reward = 0.0
214
215        return np.array(self.state, dtype=np.float32), reward, done, {}
216
217
218    def reset(
219        self,
220        *,
221        seed: Optional[int] = None,
222        return_info: bool = False,
223        options: Optional[dict] = None,
224    ):
225        #super().reset(seed=seed)
226        #self.state = self.np_random.uniform(low=-0.05, high=0.05, size=(4,))
227        self.state = np.random.uniform(low=-0.05, high=0.05, size=(4,))
228        self.steps_beyond_done = None
229
230
231        #+++++++++++++++++++++++++++++++++++++++++++++
232        #set initial values:
233        #+++++++++++++++++++++++++++++++++++++++++++++
234        #set specific initial state:
235        (xCart, xCart_t, phiPole, phiPole_t) = self.state
236
237        initialValues = np.zeros(6)
238        initialValues_t = np.zeros(6)
239        initialValues[0] = xCart
240        initialValues[3+0] = xCart - 0.5*self.length * sin(phiPole)
241        initialValues[3+1] = 0.5*self.length * (cos(phiPole)-1)
242        initialValues[3+2] = phiPole
243
244        initialValues_t[0] = xCart_t
245        initialValues_t[3+0] = xCart_t - phiPole_t*0.5*self.length * cos(phiPole)
246        initialValues_t[3+1] = -0.5*self.length * sin(phiPole)  * phiPole_t
247        initialValues_t[3+2] = phiPole_t
248
249        self.mbs.systemData.SetODE2Coordinates(initialValues, exu.ConfigurationType.Initial)
250        self.mbs.systemData.SetODE2Coordinates_t(initialValues_t, exu.ConfigurationType.Initial)
251
252        self.simulationSettings.timeIntegration.endTime = 0
253        #self.dynamicSolver.FinalizeSolver(self.mbs, self.simulationSettings) #needed to update initial conditions
254        self.dynamicSolver.InitializeSolver(self.mbs, self.simulationSettings) #needed to update initial conditions
255        # self.dynamicSolver.InitializeSolverInitialConditions(self.mbs, self.simulationSettings) #needed to update initial conditions
256
257        if not return_info:
258            return np.array(self.state, dtype=np.float32)
259        else:
260            return np.array(self.state, dtype=np.float32), {}
261
262    def render(self, mode="human"):
263        if self.rendererRunning==None and self.useRenderer:
264            exu.StartRenderer()
265            self.rendererRunning = True
266
267    def close(self):
268        self.dynamicSolver.FinalizeSolver(self.mbs, self.simulationSettings)
269        if self.rendererRunning==True:
270            # SC.WaitForRenderEngineStopFlag()
271            exu.StopRenderer() #safely close rendering window!
272
273
274
275# #+++++++++++++++++++++++++++++++++++++++++++++
276# #reset:
277# self.mbs.systemData.SetODE2Coordinates(initialValues, exu.ConfigurationType.Initial)
278# self.mbs.systemData.SetODE2Coordinates_t(initialValues, exu.ConfigurationType.Initial)
279# self.mbs.systemData.SetODE2Coordinates_tt(initialValues, exu.ConfigurationType.Initial)