PKnNcatomacos/AXCallbacks.py# Copyright (c) 2010 VMware, Inc. All Rights Reserved. # This file is part of ATOMac. # ATOMac is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by the Free # Software Foundation version 2 and no later version. # ATOMac is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License version 2 # for more details. # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., 51 Franklin # St, Fifth Floor, Boston, MA 02110-1301 USA. def elemDisappearedCallback(retelem, obj, **kwargs): """Callback for checking if a UI element is no longer onscreen. kwargs should contains some unique set of identifier (e.g. title/value, role) Returns: Boolean """ return not obj.findFirstR(**kwargs) def returnElemCallback(retelem): """Callback for when a sheet appears. Returns: element returned by observer callback """ return retelem PKnNRAAatomacos/AXClasses.py# Copyright (c) 2010-2011 VMware, Inc. All Rights Reserved. # This file is part of ATOMac. # ATOMac is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by the Free # Software Foundation version 2 and no later version. # ATOMac is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License version 2 # for more details. # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., 51 Franklin # St, Fifth Floor, Boston, MA 02110-1301 USA. import fnmatch import time from collections import deque import AppKit import Quartz from atomacos import a11y, errors from atomacos import AXKeyboard from atomacos import AXCallbacks from atomacos import AXKeyCodeConstants from atomacos.notification import Observer class BaseAXUIElement(a11y.AXUIElement): """Base class for UI elements. Implements four major things: 1. Factory class methods for getAppRef and getSystemObject which properly instantiate the class. 2. Generators and methods for finding objects for use in child classes. 3. __getattribute__ call for invoking actions. 4. waitFor utility based upon AX notifications. """ def __init__(self, ref=None): super(BaseAXUIElement, self).__init__(ref=ref) self.eventList = deque() @classmethod def _getRunningApps(cls): """Get a list of the running applications.""" return a11y.get_running_apps() @classmethod def getAppRefByPid(cls, pid): """Get the top level element for the application specified by pid.""" return cls.from_pid(pid) @classmethod def getAppRefByBundleId(cls, bundleId): """ Get the top level element for the application with the specified bundle ID, such as com.vmware.fusion. """ return cls.from_bundle_id(bundleId) @classmethod def getAppRefByLocalizedName(cls, name): """Get the top level element for the application with the specified localized name, such as VMware Fusion. Wildcards are also allowed. """ # Refresh the runningApplications list return cls.from_localized_name(name) @classmethod def getFrontmostApp(cls): """Get the current frontmost application. Raise a ValueError exception if no GUI applications are found. """ # Refresh the runningApplications list return cls.frontmost() @classmethod def getAnyAppWithWindow(cls): """Get a random app that has windows. Raise a ValueError exception if no GUI applications are found. """ # Refresh the runningApplications list return cls.with_window() @classmethod def getSystemObject(cls): """Get the top level system accessibility object.""" return cls.systemwide() @classmethod def setSystemWideTimeout(cls, timeout=0.0): """Set the system-wide accessibility timeout. Optional: timeout (non-negative float; defaults to 0) A value of 0 will reset the timeout to the system default. Returns: None. """ return cls.set_systemwide_timeout(timeout) @staticmethod def launchAppByBundleId(bundleID): """Launch the application with the specified bundle ID""" # NSWorkspaceLaunchAllowingClassicStartup does nothing on any # modern system that doesn't have the classic environment installed. # Encountered a bug when passing 0 for no options on 10.6 PyObjC. a11y.AXUIElement.launch_app_by_bundle_id(bundleID) @staticmethod def launchAppByBundlePath(bundlePath, arguments=None): """Launch app with a given bundle path. Return True if succeed. """ return a11y.AXUIElement.launch_app_by_bundle_path( bundlePath, arguments ) @staticmethod def terminateAppByBundleId(bundleID): """Terminate app with a given bundle ID. Requires 10.6. Return True if succeed. """ return a11y.AXUIElement.terminate_app_by_bundle_id(bundleID) @staticmethod def set_systemwide_timeout(timeout=0.0): """Set the system-wide accessibility timeout. Optional: timeout (non-negative float; defaults to 0) A value of 0 will reset the timeout to the system default. Returns: None. """ return BaseAXUIElement.systemwide().setTimeout(timeout) def setTimeout(self, timeout=0.0): """Set the accessibiltiy API timeout on the given reference. Optional: timeout (non-negative float; defaults to 0) A value of 0 will reset the timeout to the system-wide value Returns: None """ self.set_timeout(timeout) def _postQueuedEvents(self, interval=0.01): """Private method to post queued events (e.g. Quartz events). Each event in queue is a tuple (event call, args to event call). """ while len(self.eventList) > 0: (nextEvent, args) = self.eventList.popleft() nextEvent(*args) time.sleep(interval) def _clearEventQueue(self): """Clear the event queue.""" if hasattr(self, "eventList"): self.eventList.clear() def _queueEvent(self, event, args): """Private method to queue events to run. Each event in queue is a tuple (event call, args to event call). """ self.eventList.append((event, args)) def _addKeyToQueue(self, keychr, modFlags=0, globally=False): """Add keypress to queue. Parameters: key character or constant referring to a non-alpha-numeric key (e.g. RETURN or TAB) modifiers global or app specific Returns: None or raise ValueError exception. """ # Awkward, but makes modifier-key-only combinations possible # (since sendKeyWithModifiers() calls this) if not keychr: return if not hasattr(self, "keyboard"): self.keyboard = AXKeyboard.loadKeyboard() if keychr in self.keyboard["upperSymbols"] and not modFlags: self._sendKeyWithModifiers( keychr, [AXKeyCodeConstants.SHIFT], globally ) return if keychr.isupper() and not modFlags: self._sendKeyWithModifiers( keychr.lower(), [AXKeyCodeConstants.SHIFT], globally ) return if keychr not in self.keyboard: self._clearEventQueue() raise ValueError("Key %s not found in keyboard layout" % keychr) # Press the key keyDown = Quartz.CGEventCreateKeyboardEvent( None, self.keyboard[keychr], True ) # Release the key keyUp = Quartz.CGEventCreateKeyboardEvent( None, self.keyboard[keychr], False ) # Set modflags on keyDown (default None): Quartz.CGEventSetFlags(keyDown, modFlags) # Set modflags on keyUp: Quartz.CGEventSetFlags(keyUp, modFlags) # Post the event to the given app if not globally: self._queueEvent( Quartz.CGEventPostToPid, (self._getPid(), keyDown) ) self._queueEvent(Quartz.CGEventPostToPid, (self._getPid(), keyUp)) else: self._queueEvent(Quartz.CGEventPost, (0, keyDown)) self._queueEvent(Quartz.CGEventPost, (0, keyUp)) def _sendKey(self, keychr, modFlags=0, globally=False): """Send one character with no modifiers. Parameters: key character or constant referring to a non-alpha-numeric key (e.g. RETURN or TAB) modifier flags, global or app specific Returns: None or raise ValueError exception """ escapedChrs = { "\n": AXKeyCodeConstants.RETURN, "\r": AXKeyCodeConstants.RETURN, "\t": AXKeyCodeConstants.TAB, } if keychr in escapedChrs: keychr = escapedChrs[keychr] self._addKeyToQueue(keychr, modFlags, globally=globally) self._postQueuedEvents() def _sendKeys(self, keystr): """Send a series of characters with no modifiers. Parameters: keystr Returns: None or raise ValueError exception """ for nextChr in keystr: self._sendKey(nextChr) def _pressModifiers(self, modifiers, pressed=True, globally=False): """Press given modifiers (provided in list form). Parameters: modifiers list, global or app specific Optional: keypressed state (default is True (down)) Returns: Unsigned int representing flags to set """ if not isinstance(modifiers, list): raise TypeError("Please provide modifiers in list form") if not hasattr(self, "keyboard"): self.keyboard = AXKeyboard.loadKeyboard() modFlags = 0 # Press given modifiers for nextMod in modifiers: if nextMod not in self.keyboard: errStr = "Key %s not found in keyboard layout" self._clearEventQueue() raise ValueError(errStr % self.keyboard[nextMod]) modEvent = Quartz.CGEventCreateKeyboardEvent( Quartz.CGEventSourceCreate(0), self.keyboard[nextMod], pressed ) if not pressed: # Clear the modflags: Quartz.CGEventSetFlags(modEvent, 0) if globally: self._queueEvent(Quartz.CGEventPost, (0, modEvent)) else: self._queueEvent( Quartz.CGEventPostToPid, (self._getPid(), modEvent) ) # Add the modifier flags modFlags += AXKeyboard.modKeyFlagConstants[nextMod] return modFlags def _holdModifierKeys(self, modifiers): """Hold given modifier keys (provided in list form). Parameters: modifiers list Returns: Unsigned int representing flags to set """ modFlags = self._pressModifiers(modifiers) # Post the queued keypresses: self._postQueuedEvents() return modFlags def _releaseModifiers(self, modifiers, globally=False): """Release given modifiers (provided in list form). Parameters: modifiers list Returns: None """ # Release them in reverse order from pressing them: modifiers.reverse() modFlags = self._pressModifiers( modifiers, pressed=False, globally=globally ) return modFlags def _releaseModifierKeys(self, modifiers): """Release given modifier keys (provided in list form). Parameters: modifiers list Returns: Unsigned int representing flags to set """ modFlags = self._releaseModifiers(modifiers) # Post the queued keypresses: self._postQueuedEvents() return modFlags @staticmethod def _isSingleCharacter(keychr): """Check whether given keyboard character is a single character. Parameters: key character which will be checked. Returns: True when given key character is a single character. """ if not keychr: return False # Regular character case. if len(keychr) == 1: return True # Tagged character case. return ( keychr.count("<") == 1 and keychr.count(">") == 1 and keychr[0] == "<" and keychr[-1] == ">" ) def _sendKeyWithModifiers(self, keychr, modifiers, globally=False): """Send one character with the given modifiers pressed. Parameters: key character, list of modifiers, global or app specific Returns: None or raise ValueError exception """ if not self._isSingleCharacter(keychr): raise ValueError("Please provide only one character to send") if not hasattr(self, "keyboard"): self.keyboard = AXKeyboard.loadKeyboard() modFlags = self._pressModifiers(modifiers, globally=globally) # Press the non-modifier key self._sendKey(keychr, modFlags, globally=globally) # Release the modifiers self._releaseModifiers(modifiers, globally=globally) # Post the queued keypresses: self._postQueuedEvents() def _queueMouseButton( self, coord, mouseButton, modFlags, clickCount=1, dest_coord=None ): """Private method to handle generic mouse button clicking. Parameters: coord (x, y) to click, mouseButton (e.g., kCGMouseButtonLeft), modFlags set (int) Optional: clickCount (default 1; set to 2 for double-click; 3 for triple-click on host) Returns: None """ # For now allow only left and right mouse buttons: mouseButtons = { Quartz.kCGMouseButtonLeft: "LeftMouse", Quartz.kCGMouseButtonRight: "RightMouse", } if mouseButton not in mouseButtons: raise ValueError("Mouse button given not recognized") eventButtonDown = getattr( Quartz, "kCGEvent%sDown" % mouseButtons[mouseButton] ) eventButtonUp = getattr( Quartz, "kCGEvent%sUp" % mouseButtons[mouseButton] ) eventButtonDragged = getattr( Quartz, "kCGEvent%sDragged" % mouseButtons[mouseButton] ) # Press the button buttonDown = Quartz.CGEventCreateMouseEvent( None, eventButtonDown, coord, mouseButton ) # Set modflags (default None) on button down: Quartz.CGEventSetFlags(buttonDown, modFlags) # Set the click count on button down: Quartz.CGEventSetIntegerValueField( buttonDown, Quartz.kCGMouseEventClickState, int(clickCount) ) if dest_coord: # Drag and release the button buttonDragged = Quartz.CGEventCreateMouseEvent( None, eventButtonDragged, dest_coord, mouseButton ) # Set modflags on the button dragged: Quartz.CGEventSetFlags(buttonDragged, modFlags) buttonUp = Quartz.CGEventCreateMouseEvent( None, eventButtonUp, dest_coord, mouseButton ) else: # Release the button buttonUp = Quartz.CGEventCreateMouseEvent( None, eventButtonUp, coord, mouseButton ) # Set modflags on the button up: Quartz.CGEventSetFlags(buttonUp, modFlags) # Set the click count on button up: Quartz.CGEventSetIntegerValueField( buttonUp, Quartz.kCGMouseEventClickState, int(clickCount) ) # Queue the events self._queueEvent( Quartz.CGEventPost, (Quartz.kCGSessionEventTap, buttonDown) ) if dest_coord: self._queueEvent( Quartz.CGEventPost, (Quartz.kCGHIDEventTap, buttonDragged) ) self._queueEvent( Quartz.CGEventPost, (Quartz.kCGSessionEventTap, buttonUp) ) def _leftMouseDragged(self, stopCoord, strCoord, speed): """Private method to handle generic mouse left button dragging and dropping. Parameters: stopCoord(x,y) drop point Optional: strCoord (x, y) drag point, default (0,0) get current mouse position speed (int) 1 to unlimit, simulate mouse moving action from some special requirement Returns: None """ # Get current position as start point if strCoord not given if strCoord == (0, 0): loc = AppKit.NSEvent.mouseLocation() strCoord = (loc.x, Quartz.CGDisplayPixelsHigh(0) - loc.y) # Press left button down pressLeftButton = Quartz.CGEventCreateMouseEvent( None, Quartz.kCGEventLeftMouseDown, strCoord, Quartz.kCGMouseButtonLeft, ) # Queue the events Quartz.CGEventPost(Quartz.CoreGraphics.kCGHIDEventTap, pressLeftButton) # Wait for reponse of system, a fuzzy icon appears time.sleep(5) # Simulate mouse moving speed, k is slope speed = round(1 / float(speed), 2) xmoved = stopCoord[0] - strCoord[0] ymoved = stopCoord[1] - strCoord[1] if ymoved == 0: raise ValueError("Not support horizontal moving") else: k = abs(ymoved / xmoved) if xmoved != 0: for xpos in range(int(abs(xmoved))): if xmoved > 0 and ymoved > 0: currcoord = (strCoord[0] + xpos, strCoord[1] + xpos * k) elif xmoved > 0 and ymoved < 0: currcoord = (strCoord[0] + xpos, strCoord[1] - xpos * k) elif xmoved < 0 and ymoved < 0: currcoord = (strCoord[0] - xpos, strCoord[1] - xpos * k) elif xmoved < 0 and ymoved > 0: currcoord = (strCoord[0] - xpos, strCoord[1] + xpos * k) # Drag with left button dragLeftButton = Quartz.CGEventCreateMouseEvent( None, Quartz.kCGEventLeftMouseDragged, currcoord, Quartz.kCGMouseButtonLeft, ) Quartz.CGEventPost( Quartz.CoreGraphics.kCGHIDEventTap, dragLeftButton ) # Wait for reponse of system time.sleep(speed) else: raise ValueError("Not support vertical moving") upLeftButton = Quartz.CGEventCreateMouseEvent( None, Quartz.kCGEventLeftMouseUp, stopCoord, Quartz.kCGMouseButtonLeft, ) # Wait for reponse of system, a plus icon appears time.sleep(5) # Up left button up Quartz.CGEventPost(Quartz.CoreGraphics.kCGHIDEventTap, upLeftButton) def _waitFor(self, timeout, notification, **kwargs): """Wait for a particular UI event to occur; this can be built upon in NativeUIElement for specific convenience methods. """ callback = self._matchOther retelem = None callbackArgs = None callbackKwargs = None # Allow customization of the callback, though by default use the basic # _match() method if "callback" in kwargs: callback = kwargs["callback"] del kwargs["callback"] # Deal with these only if callback is provided: if "args" in kwargs: if not isinstance(kwargs["args"], tuple): errStr = "Notification callback args not given as a tuple" raise TypeError(errStr) # If args are given, notification will pass back the returned # element in the first positional arg callbackArgs = kwargs["args"] del kwargs["args"] if "kwargs" in kwargs: if not isinstance(kwargs["kwargs"], dict): errStr = "Notification callback kwargs not given as a dict" raise TypeError(errStr) callbackKwargs = kwargs["kwargs"] del kwargs["kwargs"] # If kwargs are not given as a dictionary but individually listed # need to update the callbackKwargs dict with the remaining items in # kwargs if kwargs: if callbackKwargs: callbackKwargs.update(kwargs) else: callbackKwargs = kwargs else: if retelem: callbackArgs = (retelem,) # Pass the kwargs to the default callback callbackKwargs = kwargs return Observer(self).set_notification( timeout, notification, callback, callbackArgs, callbackKwargs ) def waitForFocusToMatchCriteria(self, timeout=10, **kwargs): """Convenience method to wait for focused element to change (to element matching kwargs criteria). Returns: Element or None """ def _matchFocused(retelem, **kwargs): return retelem if retelem._match(**kwargs) else None retelem = None return self._waitFor( timeout, "AXFocusedUIElementChanged", callback=_matchFocused, args=(retelem,), **kwargs ) def _getActions(self): """Retrieve a list of actions supported by the object.""" actions = self.ax_actions # strip leading AX from actions - help distinguish them from attributes return [action[2:] for action in actions] def _performAction(self, action): """Perform the specified action.""" self._performAction("AX%s" % action) def _generateChildren(self): """Generator which yields all AXChildren of the object.""" try: children = self.AXChildren except errors.AXError: return if children: for child in children: yield child def _generateChildrenR(self, target=None): """Generator which recursively yields all AXChildren of the object.""" if target is None: target = self if "AXChildren" in target.ax_attributes: for child in target.AXChildren: yield child for c in self._generateChildrenR(child): yield c else: return def _match(self, **kwargs): """Method which indicates if the object matches specified criteria. Match accepts criteria as kwargs and looks them up on attributes. Actual matching is performed with fnmatch, so shell-like wildcards work within match strings. Examples: obj._match(AXTitle='Terminal*') obj._match(AXRole='TextField', AXRoleDescription='search text field') """ for k in kwargs.keys(): try: val = getattr(self, k) except AttributeError: return False # Not all values may be strings (e.g. size, position) if isinstance(val, str): if not fnmatch.fnmatch(val, kwargs[k]): return False else: if val != kwargs[k]: return False return True def _matchOther(self, obj, **kwargs): """Perform _match but on another object, not self.""" if obj is not None: # Need to check that the returned UI element wasn't destroyed first: if self._findFirstR(**kwargs): return obj._match(**kwargs) return False def _generateFind(self, **kwargs): """Generator which yields matches on AXChildren.""" for needle in self._generateChildren(): if needle._match(**kwargs): yield needle def _generateFindR(self, **kwargs): """Generator which yields matches on AXChildren and their children.""" for needle in self._generateChildrenR(): if needle._match(**kwargs): yield needle def _findAll(self, **kwargs): """Return a list of all children that match the specified criteria.""" result = [] for item in self._generateFind(**kwargs): result.append(item) return result def _findAllR(self, **kwargs): """Return a list of all children (recursively) that match the specified criteria. """ result = [] for item in self._generateFindR(**kwargs): result.append(item) return result def _findFirst(self, **kwargs): """Return the first object that matches the criteria.""" for item in self._generateFind(**kwargs): return item def _findFirstR(self, **kwargs): """Search recursively for the first object that matches the criteria.""" for item in self._generateFindR(**kwargs): return item def _getApplication(self): """Get the base application UIElement. If the UIElement is a child of the application, it will try to get the AXParent until it reaches the top application level element. """ app = self while "AXParent" in app.ax_attributes: app = app.AXParent return app def _menuItem(self, menuitem, *args): """Return the specified menu item. Example - refer to items by name: app._menuItem(app.AXMenuBar, 'File', 'New').Press() app._menuItem(app.AXMenuBar, 'Edit', 'Insert', 'Line Break').Press() Refer to items by index: app._menuitem(app.AXMenuBar, 1, 0).Press() Refer to items by mix-n-match: app._menuitem(app.AXMenuBar, 1, 'About TextEdit').Press() """ self._activate() for item in args: # If the item has an AXMenu as a child, navigate into it. # This seems like a silly abstraction added by apple's a11y api. if menuitem.AXChildren[0].AXRole == "AXMenu": menuitem = menuitem.AXChildren[0] # Find AXMenuBarItems and AXMenuItems using a handy wildcard try: menuitem = menuitem.AXChildren[int(item)] except ValueError: menuitem = menuitem.findFirst( AXRole="AXMenu*Item", AXTitle=item ) return menuitem def _activate(self): """Activate the application (bringing menus and windows forward).""" ra = AppKit.NSRunningApplication app = ra.runningApplicationWithProcessIdentifier_(self.pid) # NSApplicationActivateAllWindows | NSApplicationActivateIgnoringOtherApps # == 3 - PyObjC in 10.6 does not expose these constants though so I have # to use the int instead of the symbolic names app.activateWithOptions_(3) def _getBundleId(self): """Return the bundle ID of the application.""" ra = AppKit.NSRunningApplication app = ra.runningApplicationWithProcessIdentifier_(self._getPid()) return app.bundleIdentifier() def _getLocalizedName(self): """Return the localized name of the application.""" return self._getApplication().AXTitle def __getattr__(self, name): """Handle attribute requests in several ways: 1. If it starts with AX, it is probably an a11y attribute. Pass it to the handler in _a11y which will determine that for sure. 2. See if the attribute is an action which can be invoked on the UIElement. If so, return a function that will invoke the attribute. """ if "AX" + name in self.ax_actions: action = super(BaseAXUIElement, self).__getattr__("AX" + name) def performSpecifiedAction(): # activate the app before performing the specified action self._activate() return action() return performSpecifiedAction else: return super(BaseAXUIElement, self).__getattr__(name) class NativeUIElement(BaseAXUIElement): """NativeUIElement class - expose the accessibility API in the simplest, most natural way possible. """ def getAttributes(self): """Get a list of the attributes available on the element.""" return self.ax_attributes def getActions(self): """Return a list of the actions available on the element.""" return self.ax_actions def setString(self, attribute, string): """Set the specified attribute to the specified string.""" return self._setString(attribute, string) def findFirst(self, **kwargs): """Return the first object that matches the criteria.""" return self._findFirst(**kwargs) def findFirstR(self, **kwargs): """Search recursively for the first object that matches the criteria. """ return self._findFirstR(**kwargs) def findAll(self, **kwargs): """Return a list of all children that match the specified criteria.""" return self._findAll(**kwargs) def findAllR(self, **kwargs): """Return a list of all children (recursively) that match the specified criteria. """ return self._findAllR(**kwargs) def getElementAtPosition(self, coord): """Return the AXUIElement at the given coordinates. If self is behind other windows, this function will return self. """ return self._getElementAtPosition(float(coord[0]), float(coord[1])) def activate(self): """Activate the application (bringing menus and windows forward)""" return self._activate() def getApplication(self): """Get the base application UIElement. If the UIElement is a child of the application, it will try to get the AXParent until it reaches the top application level element. """ return self._getApplication() def menuItem(self, *args): """Return the specified menu item. Example - refer to items by name: app.menuItem('File', 'New').Press() app.menuItem('Edit', 'Insert', 'Line Break').Press() Refer to items by index: app.menuitem(1, 0).Press() Refer to items by mix-n-match: app.menuitem(1, 'About TextEdit').Press() """ menuitem = self._getApplication().AXMenuBar return self._menuItem(menuitem, *args) def popUpItem(self, *args): """Return the specified item in a pop up menu.""" self.Press() time.sleep(0.5) return self._menuItem(self, *args) def getBundleId(self): """Return the bundle ID of the application.""" return self._getBundleId() def getLocalizedName(self): """Return the localized name of the application.""" return self._getLocalizedName() def sendKey(self, keychr): """Send one character with no modifiers.""" return self._sendKey(keychr) def sendGlobalKey(self, keychr): """Send one character without modifiers to the system. It will not send an event directly to the application, system will dispatch it to the window which has keyboard focus. Parameters: keychr - Single keyboard character which will be sent. """ return self._sendKey(keychr, globally=True) def sendKeys(self, keystr): """Send a series of characters with no modifiers.""" return self._sendKeys(keystr) def pressModifiers(self, modifiers): """Hold modifier keys (e.g. [Option]).""" return self._holdModifierKeys(modifiers) def releaseModifiers(self, modifiers): """Release modifier keys (e.g. [Option]).""" return self._releaseModifierKeys(modifiers) def sendKeyWithModifiers(self, keychr, modifiers): """Send one character with modifiers pressed Parameters: key character, modifiers (list) (e.g. [SHIFT] or [COMMAND, SHIFT] (assuming you've first used from pyatom.AXKeyCodeConstants import *)) """ return self._sendKeyWithModifiers(keychr, modifiers, False) def sendGlobalKeyWithModifiers(self, keychr, modifiers): """Global send one character with modifiers pressed. See sendKeyWithModifiers """ return self._sendKeyWithModifiers(keychr, modifiers, True) def dragMouseButtonLeft(self, coord, dest_coord, interval=0.5): """Drag the left mouse button without modifiers pressed. Parameters: coordinates to click on screen (tuple (x, y)) dest coordinates to drag to (tuple (x, y)) interval to send event of btn down, drag and up Returns: None """ modFlags = 0 self._queueMouseButton( coord, Quartz.kCGMouseButtonLeft, modFlags, dest_coord=dest_coord ) self._postQueuedEvents(interval=interval) def doubleClickDragMouseButtonLeft(self, coord, dest_coord, interval=0.5): """Double-click and drag the left mouse button without modifiers pressed. Parameters: coordinates to double-click on screen (tuple (x, y)) dest coordinates to drag to (tuple (x, y)) interval to send event of btn down, drag and up Returns: None """ modFlags = 0 self._queueMouseButton( coord, Quartz.kCGMouseButtonLeft, modFlags, dest_coord=dest_coord ) self._queueMouseButton( coord, Quartz.kCGMouseButtonLeft, modFlags, dest_coord=dest_coord, clickCount=2, ) self._postQueuedEvents(interval=interval) def clickMouseButtonLeft(self, coord, interval=None): """Click the left mouse button without modifiers pressed. Parameters: coordinates to click on screen (tuple (x, y)) Returns: None """ modFlags = 0 self._queueMouseButton(coord, Quartz.kCGMouseButtonLeft, modFlags) if interval: self._postQueuedEvents(interval=interval) else: self._postQueuedEvents() def clickMouseButtonRight(self, coord): """Click the right mouse button without modifiers pressed. Parameters: coordinates to click on scren (tuple (x, y)) Returns: None """ modFlags = 0 self._queueMouseButton(coord, Quartz.kCGMouseButtonRight, modFlags) self._postQueuedEvents() def clickMouseButtonLeftWithMods(self, coord, modifiers, interval=None): """Click the left mouse button with modifiers pressed. Parameters: coordinates to click; modifiers (list) (e.g. [SHIFT] or [COMMAND, SHIFT] (assuming you've first used from pyatom.AXKeyCodeConstants import *)) Returns: None """ modFlags = self._pressModifiers(modifiers) self._queueMouseButton(coord, Quartz.kCGMouseButtonLeft, modFlags) self._releaseModifiers(modifiers) if interval: self._postQueuedEvents(interval=interval) else: self._postQueuedEvents() def clickMouseButtonRightWithMods(self, coord, modifiers): """Click the right mouse button with modifiers pressed. Parameters: coordinates to click; modifiers (list) Returns: None """ modFlags = self._pressModifiers(modifiers) self._queueMouseButton(coord, Quartz.kCGMouseButtonRight, modFlags) self._releaseModifiers(modifiers) self._postQueuedEvents() def leftMouseDragged(self, stopCoord, strCoord=(0, 0), speed=1): """Click the left mouse button and drag object. Parameters: stopCoord, the position of dragging stopped strCoord, the position of dragging started (0,0) will get current position speed is mouse moving speed, 0 to unlimited Returns: None """ self._leftMouseDragged(stopCoord, strCoord, speed) def doubleClickMouse(self, coord): """Double-click primary mouse button. Parameters: coordinates to click (assume primary is left button) Returns: None """ modFlags = 0 self._queueMouseButton(coord, Quartz.kCGMouseButtonLeft, modFlags) # This is a kludge: # If directed towards a Fusion VM the clickCount gets ignored and this # will be seen as a single click, so in sequence this will be a double- # click # Otherwise to a host app only this second one will count as a double- # click self._queueMouseButton( coord, Quartz.kCGMouseButtonLeft, modFlags, clickCount=2 ) self._postQueuedEvents() def doubleMouseButtonLeftWithMods(self, coord, modifiers): """Click the left mouse button with modifiers pressed. Parameters: coordinates to click; modifiers (list) Returns: None """ modFlags = self._pressModifiers(modifiers) self._queueMouseButton(coord, Quartz.kCGMouseButtonLeft, modFlags) self._queueMouseButton( coord, Quartz.kCGMouseButtonLeft, modFlags, clickCount=2 ) self._releaseModifiers(modifiers) self._postQueuedEvents() def tripleClickMouse(self, coord): """Triple-click primary mouse button. Parameters: coordinates to click (assume primary is left button) Returns: None """ # Note above re: double-clicks applies to triple-clicks modFlags = 0 for _ in range(2): self._queueMouseButton(coord, Quartz.kCGMouseButtonLeft, modFlags) self._queueMouseButton( coord, Quartz.kCGMouseButtonLeft, modFlags, clickCount=3 ) self._postQueuedEvents() def waitFor(self, timeout, notification, **kwargs): """Generic wait for a UI event that matches the specified criteria to occur. For customization of the callback, use keyword args labeled 'callback', 'args', and 'kwargs' for the callback fn, callback args, and callback kwargs, respectively. Also note that on return, the observer-returned UI element will be included in the first argument if 'args' are given. Note also that if the UI element is destroyed, callback should not use it, otherwise the function will hang. """ return self._waitFor(timeout, notification, **kwargs) def waitForCreation(self, timeout=10, notification="AXCreated"): """Convenience method to wait for creation of some UI element. Returns: The element created """ callback = AXCallbacks.returnElemCallback retelem = None args = (retelem,) return self.waitFor( timeout, notification, callback=callback, args=args ) def waitForWindowToAppear(self, winName, timeout=10): """Convenience method to wait for a window with the given name to appear. Returns: Boolean """ return self.waitFor(timeout, "AXWindowCreated", AXTitle=winName) def waitForWindowToDisappear(self, winName, timeout=10): """Convenience method to wait for a window with the given name to disappear. Returns: Boolean """ callback = AXCallbacks.elemDisappearedCallback retelem = None args = (retelem, self) # For some reason for the AXUIElementDestroyed notification to fire, # we need to have a reference to it first win = self.findFirst(AXRole="AXWindow", AXTitle=winName) # noqa: F841 return self.waitFor( timeout, "AXUIElementDestroyed", callback=callback, args=args, AXRole="AXWindow", AXTitle=winName, ) def waitForSheetToAppear(self, timeout=10): """Convenience method to wait for a sheet to appear. Returns: the sheet that appeared (element) or None """ return self.waitForCreation(timeout, "AXSheetCreated") def waitForValueToChange(self, timeout=10): """Convenience method to wait for value attribute of given element to change. Some types of elements (e.g. menu items) have their titles change, so this will not work for those. This seems to work best if you set the notification at the application level. Returns: Element or None """ # Want to identify that the element whose value changes matches this # object's. Unique identifiers considered include role and position # This seems to work best if you set the notification at the application # level callback = AXCallbacks.returnElemCallback retelem = None return self.waitFor( timeout, "AXValueChanged", callback=callback, args=(retelem,) ) def waitForFocusToChange(self, newFocusedElem, timeout=10): """Convenience method to wait for focused element to change (to new element given). Returns: Boolean """ return self.waitFor( timeout, "AXFocusedUIElementChanged", AXRole=newFocusedElem.AXRole, AXPosition=newFocusedElem.AXPosition, ) def waitForFocusedWindowToChange(self, nextWinName, timeout=10): """Convenience method to wait for focused window to change Returns: Boolean """ return self.waitFor( timeout, "AXFocusedWindowChanged", AXTitle=nextWinName ) def _convenienceMatch(self, role, attr, match): """Method used by role based convenience functions to find a match""" kwargs = {} # If the user supplied some text to search for, # supply that in the kwargs if match: kwargs[attr] = match return self.findAll(AXRole=role, **kwargs) def _convenienceMatchR(self, role, attr, match): """Method used by role based convenience functions to find a match""" kwargs = {} # If the user supplied some text to search for, # supply that in the kwargs if match: kwargs[attr] = match return self.findAllR(AXRole=role, **kwargs) def textAreas(self, match=None): """Return a list of text areas with an optional match parameter.""" return self._convenienceMatch("AXTextArea", "AXTitle", match) def textAreasR(self, match=None): """Return a list of text areas with an optional match parameter.""" return self._convenienceMatchR("AXTextArea", "AXTitle", match) def textFields(self, match=None): """Return a list of textfields with an optional match parameter.""" return self._convenienceMatch( "AXTextField", "AXRoleDescription", match ) def textFieldsR(self, match=None): """Return a list of textfields with an optional match parameter.""" return self._convenienceMatchR( "AXTextField", "AXRoleDescription", match ) def buttons(self, match=None): """Return a list of buttons with an optional match parameter.""" return self._convenienceMatch("AXButton", "AXTitle", match) def buttonsR(self, match=None): """Return a list of buttons with an optional match parameter.""" return self._convenienceMatchR("AXButton", "AXTitle", match) def windows(self, match=None): """Return a list of windows with an optional match parameter.""" return self._convenienceMatch("AXWindow", "AXTitle", match) def windowsR(self, match=None): """Return a list of windows with an optional match parameter.""" return self._convenienceMatchR("AXWindow", "AXTitle", match) def sheets(self, match=None): """Return a list of sheets with an optional match parameter.""" return self._convenienceMatch("AXSheet", "AXDescription", match) def sheetsR(self, match=None): """Return a list of sheets with an optional match parameter.""" return self._convenienceMatchR("AXSheet", "AXDescription", match) def staticTexts(self, match=None): """Return a list of statictexts with an optional match parameter.""" return self._convenienceMatch("AXStaticText", "AXValue", match) def staticTextsR(self, match=None): """Return a list of statictexts with an optional match parameter""" return self._convenienceMatchR("AXStaticText", "AXValue", match) def genericElements(self, match=None): """Return a list of genericelements with an optional match parameter.""" return self._convenienceMatch("AXGenericElement", "AXValue", match) def genericElementsR(self, match=None): """Return a list of genericelements with an optional match parameter.""" return self._convenienceMatchR("AXGenericElement", "AXValue", match) def groups(self, match=None): """Return a list of groups with an optional match parameter.""" return self._convenienceMatch("AXGroup", "AXRoleDescription", match) def groupsR(self, match=None): """Return a list of groups with an optional match parameter.""" return self._convenienceMatchR("AXGroup", "AXRoleDescription", match) def radioButtons(self, match=None): """Return a list of radio buttons with an optional match parameter.""" return self._convenienceMatch("AXRadioButton", "AXTitle", match) def radioButtonsR(self, match=None): """Return a list of radio buttons with an optional match parameter.""" return self._convenienceMatchR("AXRadioButton", "AXTitle", match) def popUpButtons(self, match=None): """Return a list of popup menus with an optional match parameter.""" return self._convenienceMatch("AXPopUpButton", "AXTitle", match) def popUpButtonsR(self, match=None): """Return a list of popup menus with an optional match parameter.""" return self._convenienceMatchR("AXPopUpButton", "AXTitle", match) def rows(self, match=None): """Return a list of rows with an optional match parameter.""" return self._convenienceMatch("AXRow", "AXTitle", match) def rowsR(self, match=None): """Return a list of rows with an optional match parameter.""" return self._convenienceMatchR("AXRow", "AXTitle", match) def sliders(self, match=None): """Return a list of sliders with an optional match parameter.""" return self._convenienceMatch("AXSlider", "AXValue", match) def slidersR(self, match=None): """Return a list of sliders with an optional match parameter.""" return self._convenienceMatchR("AXSlider", "AXValue", match) PKnN?UEatomacos/AXKeyCodeConstants.py# Copyright (c) 2010 VMware, Inc. All Rights Reserved. # This file is part of ATOMac. # ATOMac is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by the Free # Software Foundation version 2 and no later version. # ATOMac is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License version 2 # for more details. # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., 51 Franklin # St, Fifth Floor, Boston, MA 02110-1301 USA. # Special keys TAB = "" RETURN = "" SPACE = "" ESCAPE = "" CAPS_LOCK = "" DELETE = "" NUM_LOCK = "" SCROLL_LOCK = "" PAUSE = "" BACKSPACE = "" INSERT = "" # Cursor movement UP = "" DOWN = "" LEFT = "" RIGHT = "" PAGE_UP = "" PAGE_DOWN = "" HOME = "" END = "" # Numeric keypad NUM_0 = "" NUM_1 = "" NUM_2 = "" NUM_3 = "" NUM_4 = "" NUM_5 = "" NUM_6 = "" NUM_7 = "" NUM_8 = "" NUM_9 = "" NUM_ENTER = "" NUM_PERIOD = "" NUM_PLUS = "" NUM_MINUS = "" NUM_MULTIPLY = "" NUM_DIVIDE = "" # Function keys F1 = "" F2 = "" F3 = "" F4 = "" F5 = "" F6 = "" F7 = "" F8 = "" F9 = "" F10 = "" F11 = "" F12 = "" # Modifier keys COMMAND_L = "" SHIFT_L = "" OPTION_L = "" CONTROL_L = "" COMMAND_R = "" SHIFT_R = "" OPTION_R = "" CONTROL_R = "" # Default modifier keys -> left: COMMAND = COMMAND_L SHIFT = SHIFT_L OPTION = OPTION_L CONTROL = CONTROL_L # Define a dictionary representing characters mapped to their virtual key codes # Lifted from the mappings found in kbdptr.h in the osxvnc project # Mapping is: character -> virtual keycode for each character / symbol / key # as noted below US_keyboard = { # Letters "a": 0, "b": 11, "c": 8, "d": 2, "e": 14, "f": 3, "g": 5, "h": 4, "i": 34, "j": 38, "k": 40, "l": 37, "m": 46, "n": 45, "o": 31, "p": 35, "q": 12, "r": 15, "s": 1, "t": 17, "u": 32, "v": 9, "w": 13, "x": 7, "y": 16, "z": 6, # Numbers "0": 29, "1": 18, "2": 19, "3": 20, "4": 21, "5": 23, "6": 22, "7": 26, "8": 28, "9": 25, # Symbols "!": 18, "@": 19, "#": 20, "$": 21, "%": 23, "^": 22, "&": 26, "*": 28, "(": 25, ")": 29, "-": 27, # Dash "_": 27, # Underscore "=": 24, "+": 24, "`": 50, # Backtick "~": 50, "[": 33, "]": 30, "{": 33, "}": 30, ";": 41, ":": 41, "'": 39, '"': 39, ",": 43, "<": 43, ".": 47, ">": 47, "/": 44, "?": 44, "\\": 42, "|": 42, # Pipe TAB: 48, # Tab: Shift-Tab sent for Tab SPACE: 49, " ": 49, # Space # Characters that on the US keyboard require use with Shift "upperSymbols": [ "!", "@", "#", "$", "%", "^", "&", "*", "(", ")", "_", "+", "~", "{", "}", ":", '"', "<", ">", "?", "|", ], } # Mapping for special (meta) keys specialKeys = { # Special keys RETURN: 36, DELETE: 117, TAB: 48, SPACE: 49, ESCAPE: 53, CAPS_LOCK: 57, NUM_LOCK: 71, SCROLL_LOCK: 107, PAUSE: 113, BACKSPACE: 51, INSERT: 114, # Cursor movement UP: 126, DOWN: 125, LEFT: 123, RIGHT: 124, PAGE_UP: 116, PAGE_DOWN: 121, # Numeric keypad NUM_0: 82, NUM_1: 83, NUM_2: 84, NUM_3: 85, NUM_4: 86, NUM_5: 87, NUM_6: 88, NUM_7: 89, NUM_8: 91, NUM_9: 92, NUM_ENTER: 76, NUM_PERIOD: 65, NUM_PLUS: 69, NUM_MINUS: 78, NUM_MULTIPLY: 67, NUM_DIVIDE: 75, # Function keys F1: 122, F2: 120, F3: 99, F4: 118, F5: 96, F6: 97, F7: 98, F8: 100, F9: 101, F10: 109, F11: 103, F12: 111, # Modifier keys COMMAND_L: 55, SHIFT_L: 56, OPTION_L: 58, CONTROL_L: 59, COMMAND_R: 54, SHIFT_R: 60, OPTION_R: 61, CONTROL_R: 62, } # Default keyboard layout DEFAULT_KEYBOARD = US_keyboard PKnN5 33atomacos/AXKeyboard.py# Copyright (c) 2010 VMware, Inc. All Rights Reserved. # This file is part of ATOMac. # ATOMac is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by the Free # Software Foundation version 2 and no later version. # ATOMac is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License version 2 # for more details. # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., 51 Franklin # St, Fifth Floor, Boston, MA 02110-1301 USA. import Quartz from atomacos.AXKeyCodeConstants import ( COMMAND, SHIFT, OPTION, CONTROL, DEFAULT_KEYBOARD, specialKeys, ) # Based on the flags provided in the Quartz documentation it does not seem # that we can distinguish between left and right modifier keys, even though # there are different virtual key codes offered between the two sets. # Thus for now we offer only a generic modifier key set w/o L-R distinction. modKeyFlagConstants = { COMMAND: Quartz.kCGEventFlagMaskCommand, SHIFT: Quartz.kCGEventFlagMaskShift, OPTION: Quartz.kCGEventFlagMaskAlternate, CONTROL: Quartz.kCGEventFlagMaskControl, } def loadKeyboard(): """Load a given keyboard mapping (of characters to virtual key codes). Default is US keyboard Parameters: None (relies on the internationalization settings) Returns: A dictionary representing the current keyboard mapping (of characters to keycodes) """ # Currently assumes US keyboard keyboard_layout = DEFAULT_KEYBOARD keyboard_layout.update(specialKeys) return keyboard_layout PKnNx+Satomacos/Clipboard.py# Copyright (c) 2010 VMware, Inc. All Rights Reserved. # This file is part of ATOMac. # ATOMac is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by the Free # Software Foundation version 2 and no later version. # ATOMac is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License version 2 # for more details. # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., 51 Franklin # St, Fifth Floor, Boston, MA 02110-1301 USA. import types import AppKit import pprint import logging import Foundation class Clipboard(object): """Class to represent clipboard-related operations for text""" # String encoding type utf8encoding = Foundation.NSUTF8StringEncoding # Class attributes to distinguish types of data: # Reference: # http://developer.apple.com/mac/library/documentation/Cocoa/Reference/ # ApplicationKit/Classes/NSPasteboard_Class/Reference/Reference.html # Text data type STRING = AppKit.NSStringPboardType # Rich-text format data type (e.g. rtf documents) RTF = AppKit.NSRTFPboardType # Image datatype (e.g. tiff) IMAGE = AppKit.NSTIFFPboardType # URL data type (not just web but file locations also) URL = AppKit.NSURLPboardType # Color datatype - not sure if we'll have to use this one # Supposedly replaced in 10.6 but the pyobjc AppKit module doesn't have the # new data type as an attribute COLOR = AppKit.NSColorPboardType # You can extend this list of data types # e.g. File copy and paste between host and guest # Not sure if text copy and paste between host and guest falls under STRING/ # RTF or not # List of PboardTypes I found in AppKit: # NSColorPboardType # NSCreateFileContentsPboardType # NSCreateFilenamePboardType # NSDragPboard # NSFileContentsPboardType # NSFilenamesPboardType # NSFilesPromisePboardType # NSFindPanelSearchOptionsPboardType # NSFindPboard # NSFontPboard # NSFontPboardType # NSGeneralPboard # NSHTMLPboardType # NSInkTextPboardType # NSMultipleTextSelectionPboardType # NSPDFPboardType # NSPICTPboardType # NSPostScriptPboardType # NSRTFDPboardType # NSRTFPboardType # NSRulerPboard # NSRulerPboardType # NSSoundPboardType # NSStringPboardType # NSTIFFPboardType # NSTabularTextPboardType # NSURLPboardType # NSVCardPboardType @classmethod def paste(cls): """Get the clipboard data ('Paste'). Returns: Data (string) retrieved or None if empty. Exceptions from AppKit will be handled by caller. """ pb = AppKit.NSPasteboard.generalPasteboard() # If we allow for multiple data types (e.g. a list of data types) # we will have to add a condition to check just the first in the # list of datatypes) data = pb.stringForType_(cls.STRING) return data @classmethod def copy(cls, data): """Set the clipboard data ('Copy'). Parameters: data to set (string) Optional: datatype if it's not a string Returns: True / False on successful copy, Any exception raised (like passes the NSPasteboardCommunicationError) should be caught by the caller. """ pp = pprint.PrettyPrinter() copy_data = "Data to copy (put in pasteboard): %s" logging.debug(copy_data % pp.pformat(data)) # Clear the pasteboard first: cleared = cls.clearAll() if not cleared: logging.warning("Clipboard could not clear properly") return False # Prepare to write the data # If we just use writeObjects the sequence to write to the clipboard is # a) Call clearContents() # b) Call writeObjects() with a list of objects to write to the # clipboard if not isinstance(data, types.ListType): data = [data] pb = AppKit.NSPasteboard.generalPasteboard() pb_set_ok = pb.writeObjects_(data) return bool(pb_set_ok) @classmethod def clearContents(cls): """Clear contents of general pasteboard. Future enhancement can include specifying which clipboard to clear Returns: True on success; caller should expect to catch exceptions, probably from AppKit (ValueError) """ log_msg = "Request to clear contents of pasteboard: general" logging.debug(log_msg) pb = AppKit.NSPasteboard.generalPasteboard() pb.clearContents() return True @classmethod def clearProperties(cls): """Clear properties of general pasteboard. Future enhancement can include specifying which clipboard's properties to clear Returns: True on success; caller should catch exceptions raised, e.g. from AppKit (ValueError) """ log_msg = "Request to clear properties of pasteboard: general" logging.debug(log_msg) pb = AppKit.NSPasteboard.generalPasteboard() pb.clearProperties() return True @classmethod def clearAll(cls): """Clear contents and properties of general pasteboard. Future enhancement can include specifying which clipboard's properties to clear Returns: Boolean True on success; caller should handle exceptions """ cls.clearContents() cls.clearProperties() return True @classmethod def isEmpty(cls, datatype=None): """Method to test if the general pasteboard is empty or not with respect to the type of object you want. Parameters: datatype (defaults to strings) Returns: Boolean True (empty) / False (has contents); Raises exception (passes any raised up) """ if not datatype: datatype = AppKit.NSString if not isinstance(datatype, types.ListType): datatype = [datatype] pp = pprint.PrettyPrinter() logging.debug("Desired datatypes: %s" % pp.pformat(datatype)) opt_dict = {} logging.debug("Results filter is: %s" % pp.pformat(opt_dict)) try: log_msg = "Request to verify pasteboard is empty" logging.debug(log_msg) pb = AppKit.NSPasteboard.generalPasteboard() # canReadObjectForClasses_options_() seems to return an int (> 0 if # True) # Need to negate to get the sense we want (True if can not read the # data type from the pasteboard) its_empty = not bool( pb.canReadObjectForClasses_options_(datatype, opt_dict) ) except ValueError as error: logging.exception(error) raise return bool(its_empty) PKnN''atomacos/Prefs.py# -*- coding: utf-8 -*- # Copyright (c) 2011 Julián Romero. # This file is part of ATOMac. # ATOMac is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by the Free # Software Foundation version 2 and no later version. # ATOMac is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License version 2 # for more details. # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., 51 Franklin # St, Fifth Floor, Boston, MA 02110-1301 USA. from future import standard_library from AppKit import NSWorkspace, NSUserDefaults, NSDictionary from os import path try: from collections import UserDict except ImportError: from UserDict import UserDict standard_library.install_aliases() __all__ = ["Prefs"] class Prefs(UserDict): """NSUserDefaults proxy to read/write application preferences. It has been conceived to prepare the preferences before a test launch the app. Once a Prefs instance is created, it doesn't detect prefs changed elsewhere, so for now you need to create the instance right before reading/writing a pref. Defaults.plist with default values is expected to exist on the app bundle. p = Prefs('com.example.App') coolStuff = p['CoolStuff'] p['CoolStuff'] = newCoolStuff """ def __init__( self, bundleID, bundlePath=None, defaultsPlistName="Defaults" ): """ bundleId: the application bundle identifier bundlePath: the full bundle path (useful to test a Debug build) defaultsPlistName: the name of the plist that contains default values """ self.__bundleID = bundleID self.__bundlePath = bundlePath UserDict.__init__(self) self.__setup(defaultsPlistName) def __setup(self, defaultsPlistName=None): NSUserDefaults.resetStandardUserDefaults() prefs = NSUserDefaults.standardUserDefaults() self.defaults = self.__defaults(defaultsPlistName) domainData = prefs.persistentDomainForName_(self.__bundleID) if domainData: self.data = domainData else: self.data = NSDictionary.dictionary() def __defaults(self, plistName="Defaults"): if self.__bundlePath is None: self.__bundlePath = NSWorkspace.sharedWorkspace().absolutePathForAppBundleWithIdentifier_( # noqa:: B950 self.__bundleID ) if self.__bundlePath: plistPath = path.join( self.__bundlePath, "Contents/Resources/%s.plist" % plistName ) plist = NSDictionary.dictionaryWithContentsOfFile_(plistPath) if plist: return plist return NSDictionary.dictionary() def get(self, key): return self.__getitem__(key) def __getitem__(self, key): result = self.data.get(key, None) if result is None or result == "": if self.defaults: result = self.defaults.get(key, None) return result def set(self, key, value): self.__setitem__(key, value) def __setitem__(self, key, value): mutableData = self.data.mutableCopy() mutableData[key] = value self.data = mutableData prefs = NSUserDefaults.standardUserDefaults() prefs.setPersistentDomain_forName_(self.data, self.__bundleID) PKnN״atomacos/__init__.py"""Automated Testing on macOS""" # flake8: noqa: F401 __version__ = "0.1.1" from atomacos import a11y from atomacos import errors from atomacos.AXClasses import NativeUIElement from atomacos.Clipboard import Clipboard from atomacos.Prefs import Prefs Error = errors.AXError ErrorAPIDisabled = errors.AXErrorAPIDisabled ErrorInvalidUIElement = errors.AXErrorInvalidUIElement ErrorCannotComplete = errors.AXErrorCannotComplete ErrorUnsupported = errors.AXErrorUnsupported ErrorNotImplemented = errors.AXErrorNotImplemented getAppRefByLocalizedName = NativeUIElement.getAppRefByLocalizedName terminateAppByBundleId = NativeUIElement.terminateAppByBundleId launchAppByBundlePath = NativeUIElement.launchAppByBundlePath setSystemWideTimeout = NativeUIElement.setSystemWideTimeout getAppRefByBundleId = NativeUIElement.getAppRefByBundleId launchAppByBundleId = NativeUIElement.launchAppByBundleId getFrontmostApp = NativeUIElement.getFrontmostApp getAppRefByPid = NativeUIElement.getAppRefByPid PKnN> 600atomacos/a11y.pyimport fnmatch import logging import AppKit from AppKit import NSURL from PyObjCTools import AppHelper from atomacos import converter from atomacos.errors import ( AXErrorUnsupported, raise_ax_error, AXErrorIllegalArgument, AXErrorCannotComplete, AXErrorAPIDisabled, AXErrorNotImplemented, ) from ApplicationServices import ( AXUIElementCreateApplication, AXUIElementCreateSystemWide, AXUIElementCopyAttributeValue, AXUIElementCopyAttributeNames, AXUIElementCopyActionNames, AXUIElementCopyElementAtPosition, AXUIElementGetPid, AXUIElementIsAttributeSettable, AXUIElementSetAttributeValue, AXUIElementPerformAction, AXUIElementSetMessagingTimeout, kAXErrorSuccess, CFEqual, NSWorkspace, AXIsProcessTrusted, ) logger = logging.getLogger(__name__) class AXUIElement(object): def __init__(self, ref=None): self.ref = ref self.converter = converter.Converter(self.__class__) def __repr__(self): """Build a descriptive string for UIElements.""" c = repr(self.__class__).partition("")[0] _attributes = self.ax_attributes for element_describer in ("AXTitle", "AXValue", "AXRoleDescription"): if element_describer in _attributes: title = getattr(self, element_describer) break else: title = "" if "AXRole" in _attributes: role = self.AXRole else: role = "" if len(title) > 20: title = title[:20] + "...'" return "<%s %s %s>" % (c, role, title) def __getattr__(self, item): if item in self.ax_attributes: return self._get_ax_attribute(item) elif item in self.ax_actions: def perform_ax_action(): self._perform_ax_actions(item) return perform_ax_action else: raise AttributeError( "'%s' object has no attribute '%s'" % (type(self), item) ) def __setattr__(self, key, value): if key.startswith("AX"): try: if key in self.ax_attributes: self._set_ax_attribute(key, value) except AXErrorIllegalArgument: pass else: super(AXUIElement, self).__setattr__(key, value) def __dir__(self): return ( self.ax_attributes + self.ax_actions + list(self.__dict__.keys()) ) def _get_ax_attribute(self, item): """Get the value of the the specified attribute""" if item in self.ax_attributes: err, attrValue = AXUIElementCopyAttributeValue( self.ref, item, None ) return self.converter.convert_value(attrValue) else: raise AttributeError( "'%s' object has no attribute '%s'" % (type(self), item) ) def _set_ax_attribute(self, name, value): """ Set the specified attribute to the specified value """ self._get_ax_attribute(name) err, to_set = AXUIElementCopyAttributeValue(self.ref, name, None) if err != kAXErrorSuccess: raise_ax_error(err, "Error retrieving attribute to set") err, settable = AXUIElementIsAttributeSettable(self.ref, name, None) if err != kAXErrorSuccess: raise_ax_error(err, "Error querying attribute") if not settable: raise AXErrorUnsupported("Attribute is not settable") err = AXUIElementSetAttributeValue(self.ref, name, value) if err != kAXErrorSuccess: raise_ax_error(err, "Error setting attribute value") @property def ax_attributes(self): """ Get a list of attributes available on the AXUIElement """ err, attr = AXUIElementCopyAttributeNames(self.ref, None) if err != kAXErrorSuccess: logger.warning("Error retrieving attribute list. %s" % err) return [] else: return list(attr) @property def ax_actions(self): """ Get a list of actions available on the AXUIElement """ err, actions = AXUIElementCopyActionNames(self.ref, None) if err != kAXErrorSuccess: logger.warning("Error retrieving action names. %s" % err) return [] else: return list(actions) def _perform_ax_actions(self, name): err = AXUIElementPerformAction(self.ref, name) if err != kAXErrorSuccess: raise_ax_error(err, "Error performing requested action") @property def pid(self): error_code, pid = AXUIElementGetPid(self.ref, None) if error_code != kAXErrorSuccess: raise_ax_error(error_code, "Error retrieving PID") return pid @classmethod def from_pid(cls, pid): """ Get an AXUIElement reference to the application by specified PID. """ app_ref = AXUIElementCreateApplication(pid) if app_ref is None: raise AXErrorUnsupported("Error getting app ref") return cls(ref=app_ref) @classmethod def systemwide(cls): """Get an AXUIElement reference for the system accessibility object.""" app_ref = AXUIElementCreateSystemWide() if app_ref is None: raise AXErrorUnsupported("Error getting a11y object") return cls(ref=app_ref) @classmethod def from_bundle_id(cls, bundle_id): """ Get the top level element for the application with the specified bundle ID, such as com.vmware.fusion. """ ra = AppKit.NSRunningApplication # return value (apps) is always an array. if there is a match it will # have an item, otherwise it won't. apps = ra.runningApplicationsWithBundleIdentifier_(bundle_id) if len(apps) == 0: raise ValueError( ( "Specified bundle ID not found in " "running apps: %s" % bundle_id ) ) pid = apps[0].processIdentifier() return cls.from_pid(pid) @classmethod def from_localized_name(cls, name): """Get the top level element for the application with the specified localized name, such as VMware Fusion. Wildcards are also allowed. """ # Refresh the runningApplications list apps = get_running_apps() for app in apps: if fnmatch.fnmatch(app.localizedName(), name): pid = app.processIdentifier() return cls.from_pid(pid) raise ValueError("Specified application not found in running apps.") @classmethod def frontmost(cls): """Get the current frontmost application. Raise a ValueError exception if no GUI applications are found. """ # Refresh the runningApplications list apps = get_running_apps() for app in apps: pid = app.processIdentifier() ref = cls.from_pid(pid) try: if ref.AXFrontmost: return ref except ( AXErrorUnsupported, AXErrorCannotComplete, AXErrorAPIDisabled, AXErrorNotImplemented, ): # Some applications do not have an explicit GUI # and so will not have an AXFrontmost attribute # Trying to read attributes from Google Chrome Helper returns # ErrorAPIDisabled for some reason - opened radar bug 12837995 pass raise ValueError("No GUI application found.") @classmethod def with_window(cls): """Get a random app that has windows. Raise a ValueError exception if no GUI applications are found. """ # Refresh the runningApplications list apps = get_running_apps() for app in apps: pid = app.processIdentifier() ref = cls.from_pid(pid) if hasattr(ref, "windows") and len(ref.windows()) > 0: return ref raise ValueError("No GUI application found.") def __eq__(self, other): if not isinstance(other, type(self)): return False if self.ref is None and other.ref is None: return True if self.ref is None or other.ref is None: return False return CFEqual(self.ref, other.ref) def __ne__(self, other): return not self.__eq__(other) def get_element_at_position(self, x, y): if self.ref is None: raise AXErrorUnsupported( "Operation not supported on null element references" ) err, res = AXUIElementCopyElementAtPosition(self.ref, x, y, None) if err != kAXErrorSuccess: try: raise_ax_error(err, "Unable to get element at position") except AXErrorIllegalArgument: raise ValueError("Arguments must be two floats.") return self.__class__(res) @staticmethod def launch_app_by_bundle_id(bundle_id): """Launch the application with the specified bundle ID""" # NSWorkspaceLaunchAllowingClassicStartup does nothing on any # modern system that doesn't have the classic environment installed. # Encountered a bug when passing 0 for no options on 10.6 PyObjC. ws = AppKit.NSWorkspace.sharedWorkspace() # Sorry about the length of the following line r = ws.launchAppWithBundleIdentifier_options_additionalEventParamDescriptor_launchIdentifier_( # noqa: B950 bundle_id, AppKit.NSWorkspaceLaunchAllowingClassicStartup, AppKit.NSAppleEventDescriptor.nullDescriptor(), None, ) # On 10.6, this returns a tuple - first element bool result, second is # a number. Let's use the bool result. if not r[0]: raise RuntimeError( "Error launching specified application. %s" % str(r) ) @staticmethod def launch_app_by_bundle_path(bundle_path, arguments=None): """Launch app with a given bundle path. Return True if succeed. """ if arguments is None: arguments = [] bundleUrl = NSURL.fileURLWithPath_(bundle_path) workspace = AppKit.NSWorkspace.sharedWorkspace() configuration = { AppKit.NSWorkspaceLaunchConfigurationArguments: arguments } return workspace.launchApplicationAtURL_options_configuration_error_( bundleUrl, AppKit.NSWorkspaceLaunchAllowingClassicStartup, configuration, None, ) @staticmethod def terminate_app_by_bundle_id(bundle_id): """Terminate app with a given bundle ID. Requires 10.6. Return True if succeed. """ ra = AppKit.NSRunningApplication appList = ra.runningApplicationsWithBundleIdentifier_(bundle_id) if appList: app = appList[0] return app and app.terminate() return False def set_timeout(self, timeout): if self.ref is None: raise AXErrorUnsupported( "Operation not supported on null element references" ) err = AXUIElementSetMessagingTimeout(self.ref, timeout) try: raise_ax_error(err, "The element reference is invalid") except AXErrorIllegalArgument: raise ValueError( "Accessibility timeout values must be non-negative" ) def get_frontmost_pid(): """Return the PID of the application in the foreground.""" frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication() pid = frontmost_app.processIdentifier() return pid def axenabled(): """Return the status of accessibility on the system.""" return AXIsProcessTrusted() def get_running_apps(): """Get a list of the running applications.""" AppHelper.callLater(1, AppHelper.stopEventLoop) AppHelper.runConsoleEventLoop() # Get a list of running applications ws = AppKit.NSWorkspace.sharedWorkspace() apps = ws.runningApplications() return apps PKnN!Satomacos/converter.pyfrom collections import namedtuple from CoreFoundation import CFGetTypeID, CFArrayGetTypeID, CFStringGetTypeID from ApplicationServices import ( AXUIElementGetTypeID, AXValueGetType, kAXValueCGSizeType, kAXValueCGPointType, kAXValueCFRangeType, NSSizeFromString, NSPointFromString, NSRangeFromString, ) import re class Converter: def __init__(self, axuielementclass=None): self.app_ref_class = axuielementclass def convert_value(self, value): if value is None: return value if CFGetTypeID(value) == CFStringGetTypeID(): try: return str(value) except UnicodeError: return value.encode("utf-8") if CFGetTypeID(value) == AXUIElementGetTypeID(): return self.convert_app_ref(value) if CFGetTypeID(value) == CFArrayGetTypeID(): return self.convert_list(value) if AXValueGetType(value) == kAXValueCGSizeType: return self.convert_size(value) if AXValueGetType(value) == kAXValueCGPointType: return self.convert_point(value) if AXValueGetType(value) == kAXValueCFRangeType: return self.convert_range(value) else: return value def convert_list(self, value): return [self.convert_value(item) for item in value] def convert_app_ref(self, value): return self.app_ref_class(ref=value) def convert_size(self, value): repr_searched = re.search("{.*}", str(value)).group() CGSize = namedtuple("CGSize", ["width", "height"]) size = NSSizeFromString(repr_searched) return CGSize(size.width, size.height) def convert_point(self, value): repr_searched = re.search("{.*}", str(value)).group() CGPoint = namedtuple("CGPoint", ["x", "y"]) point = NSPointFromString(repr_searched) return CGPoint(point.x, point.y) def convert_range(self, value): repr_searched = re.search("{.*}", str(value)).group() CFRange = namedtuple("CFRange", ["location", "length"]) range = NSRangeFromString(repr_searched) return CFRange(range.location, range.length) PKnN*atomacos/errors.pyfrom ApplicationServices import ( kAXErrorAPIDisabled, kAXErrorInvalidUIElement, kAXErrorCannotComplete, kAXErrorNotImplemented, kAXErrorIllegalArgument, kAXErrorActionUnsupported, ) class AXError(Exception): pass class AXErrorUnsupported(AXError): pass class AXErrorAPIDisabled(AXError): pass class AXErrorInvalidUIElement(AXError): pass class AXErrorCannotComplete(AXError): pass class AXErrorNotImplemented(AXError): pass class AXErrorIllegalArgument(AXError): pass class AXErrorActionUnsupported(AXError): pass def raise_ax_error(code, message): """ Raises an error with given message based on given error code. Defaults to AXErrorUnsupported for unknown codes. """ CODE_TO_AXERROR = { kAXErrorAPIDisabled: AXErrorAPIDisabled, kAXErrorInvalidUIElement: AXErrorInvalidUIElement, kAXErrorCannotComplete: AXErrorCannotComplete, kAXErrorNotImplemented: AXErrorNotImplemented, kAXErrorIllegalArgument: AXErrorIllegalArgument, kAXErrorActionUnsupported: AXErrorActionUnsupported, } ErrorFromCode = CODE_TO_AXERROR.get(code, AXErrorUnsupported) raise ErrorFromCode("%s (AX Error %s)" % (message, code)) PKnN atomacos/notification.pyimport logging import signal import objc from ApplicationServices import ( AXObserverCreate, AXObserverGetRunLoopSource, AXObserverRemoveNotification, AXObserverAddNotification, kAXErrorSuccess, NSDefaultRunLoopMode, ) from CoreFoundation import CFRunLoopGetCurrent, CFRunLoopAddSource from PyObjCTools import AppHelper, MachSignals from atomacos import errors logger = logging.getLogger(__name__) def _sigHandler(sig): AppHelper.stopEventLoop() raise KeyboardInterrupt("Keyboard interrupted Run Loop") class Observer: def __init__(self, uielement=None): self.ref = uielement self.callback = None self.callback_result = None def set_notification( self, timeout=0, notification_name=None, callbackFn=None, callbackArgs=None, callbackKwargs=None, ): if callable(callbackFn): self.callbackFn = callbackFn if isinstance(callbackArgs, tuple): self.callbackArgs = callbackArgs else: self.callbackArgs = tuple() if isinstance(callbackKwargs, dict): self.callbackKwargs = callbackKwargs else: self.callbackKwargs = dict() self.callback_result = None self.timedout = True @objc.callbackFor(AXObserverCreate) def _observer_callback(observer, element, notification, refcon): if self.callbackFn is not None: ret_element = self.ref.__class__(element) if ret_element is None: raise RuntimeError("Could not create new AX UI Element.") callback_args = (ret_element,) + self.callbackArgs callback_result = self.callbackFn( *callback_args, **self.callbackKwargs ) if callback_result is None: raise RuntimeError("Python callback failed.") if callback_result in (-1, 1): self.timedout = False AppHelper.stopEventLoop() self.callback_result = callback_result else: self.timedout = False AppHelper.stopEventLoop() err, observer = AXObserverCreate( self.ref.pid, _observer_callback, None ) if err != kAXErrorSuccess: errors.raise_ax_error( err, "Could not create observer for notification" ) err = AXObserverAddNotification( observer, self.ref.ref, notification_name, id(self.ref.ref) ) if err != kAXErrorSuccess: errors.raise_ax_error( err, "Could not add notification to observer" ) # Add observer source to run loop CFRunLoopAddSource( CFRunLoopGetCurrent(), AXObserverGetRunLoopSource(observer), NSDefaultRunLoopMode, ) # Set the signal handlers prior to running the run loop oldSigIntHandler = MachSignals.signal(signal.SIGINT, _sigHandler) AppHelper.callLater(timeout, AppHelper.stopEventLoop) AppHelper.runConsoleEventLoop() MachSignals.signal(signal.SIGINT, oldSigIntHandler) err = AXObserverRemoveNotification( observer, self.ref.ref, notification_name ) if err != kAXErrorSuccess: errors.raise_ax_error( err, "Could not remove notification from observer" ) return self.callback_result PKnN-.M.Matomacos/ldtp/__init__.py# Copyright (c) 2013 Nagappan Alagappan All Rights Reserved. # This file is part of ATOMac. # @author: Nagappan Alagappan # @copyright: Copyright (c) 2009-14 Nagappan Alagappan # http://ldtp.freedesktop.org # ATOMac is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by the Free # Software Foundation version 2 and no later version. # ATOMac is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License version 2 # for more details. # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., 51 Franklin # St, Fifth Floor, Boston, MA 02110-1301 USA. """Main routines for LDTP""" import os import re import time import atexit import socket import logging import datetime import tempfile import warnings import threading import traceback from base64 import b64decode from fnmatch import translate as glob_trans from . import state from . import client from .client_exception import LdtpExecutionError _t = None _pollEvents = None _file_logger = None _ldtp_debug = client._ldtp_debug _ldtp_windows_env = client._ldtp_windows_env if "LDTP_DEBUG" in os.environ: _ldtp_debug = os.environ["LDTP_DEBUG"] def setHost(host): client._client.setHost(host) def whoismyhost(): return client._client._ServerProxy__host LDTP_LOG_MEMINFO = 60 LDTP_LOG_CPUINFO = 61 logging.addLevelName(LDTP_LOG_MEMINFO, "MEMINFO") logging.addLevelName(LDTP_LOG_CPUINFO, "CPUINFO") # Add handler to root logger logger = logging.getLogger("") def addloghandler(handler): """ Add custom log handler @param handler: Handler instance @type handler: object @return: 1 on success and 0 on error @rtype: integer """ logger.addHandler(handler) return 1 def removeloghandler(handler): """ Remove custom log handler @param handler: Handler instance @type handler: object @return: 1 on success and 0 on error @rtype: integer """ logger.removeHandler(handler) return 1 def log(message, level=logging.DEBUG): """ Logs the message in the root logger with the log level @param message: Message to be logged @type message: string @param level: Log level, defaul DEBUG @type level: integer @return: 1 on success and 0 on error @rtype: integer """ if _ldtp_debug: print(message) logger.log(level, str(message)) return 1 def startlog(filename, overwrite=True): """ @param filename: Start logging on the specified file @type filename: string @param overwrite: Overwrite or append False - Append log to an existing file True - Write log to a new file. If file already exist, then erase existing file content and start log @type overwrite: boolean @return: 1 on success and 0 on error @rtype: integer """ if not filename: return 0 if overwrite: # Create new file, by overwriting existing file _mode = "w" else: # Append existing file _mode = "a" global _file_logger # Create logging file handler _file_logger = logging.FileHandler(os.path.expanduser(filename), _mode) # Log 'Levelname: Messages', eg: 'ERROR: Logged message' _formatter = logging.Formatter("%(levelname)-8s: %(message)s") _file_logger.setFormatter(_formatter) logger.addHandler(_file_logger) if _ldtp_debug: # On debug, change the default log level to DEBUG _file_logger.setLevel(logging.DEBUG) else: # else log in case of ERROR level and above _file_logger.setLevel(logging.ERROR) return 1 def stoplog(): """ Stop logging. @return: 1 on success and 0 on error @rtype: integer """ global _file_logger if _file_logger: logger.removeHandler(_file_logger) _file_logger = None return 1 class PollLogs(threading.Thread): """ Class to poll logs, NOTE: *NOT* for external use """ sleep_time = 0.1 global _file_logger def __init__(self): super(PollLogs, self).__init__() self.alive = True def __del__(self): """ Stop polling when destroying this class """ try: self.alive = False except: pass def stop(self): """ Stop the thread """ try: self.alive = False self.join(self.sleep_time) except: pass def run(self): while self.alive: try: if not self.poll_server(): # Socket error break except: log(traceback.format_exc()) self.alive = False break def poll_server(self): if not logger.handlers: # If no handlers registered don't call the getlastlog # as it will flush out all the logs time.sleep(self.sleep_time) return True try: message = getlastlog() except socket.error: t = traceback.format_exc() log(t) # Connection to server might be failed return False except: t = traceback.format_exc() log(t) pass if not message: # No log in queue, sleep a second time.sleep(self.sleep_time) return True # Split message type and message message_type, message = re.split("-", message, 1) if re.match("MEMINFO", message_type, re.I): level = LDTP_LOG_MEMINFO elif re.match("CPUINFO", message_type, re.I): level = LDTP_LOG_CPUINFO elif re.match("INFO", message_type, re.I): level = logging.INFO elif re.match("WARNING", message_type, re.I): level = logging.WARNING elif re.match("ERROR", message_type, re.I): level = logging.ERROR elif re.match("CRITICAL", message_type, re.I): level = logging.CRITICAL else: level = logging.DEBUG # Log the messsage with the attained level log(message, level) return True def logFailures(*args): # Do nothing. For backward compatability warnings.warn( "Use Mago framework - http://mago.ubuntu.com", DeprecationWarning ) def _populateNamespace(d): for method in client._client.system.listMethods(): if method.startswith("system."): continue if method in d: local_name = "_remote_" + method else: local_name = method d[local_name] = getattr(client._client, method) d[local_name].__doc__ = client._client.system.methodHelp(method) class PollEvents(threading.Thread): """ Class to poll callback events, NOTE: *NOT* for external use """ def __init__(self): super(PollEvents, self).__init__() self.alive = True # Initialize callback dictionary self._callback = {} def __del__(self): """ Stop callback when destroying this class """ try: self.alive = False except: pass def stop(self): """ Stop the thread """ try: self.alive = False self.join(self.sleep_time) except: pass def run(self): while self.alive: try: if not self.poll_server(): # Socket error break except: log(traceback.format_exc()) self.alive = False break def poll_server(self): if not self._callback: # If callback not registered, don't proceed further # Sleep a sleep_time and then return time.sleep(self.sleep_time) return True try: event = poll_events() except socket.error: log(traceback.format_exc()) # Connection to server might be failed return False if not event: # No event in queue, sleep a sleep_time time.sleep(self.sleep_time) return True # Event format: # window:create-Untitled Document 1 - gedit event = event.split("-", 1) # Split first - data = event[1] # Rest of data event_type = event[0] # event type # self._callback[name][0] - Event type # self._callback[name][1] - Callback function # self._callback[name][2] - Arguments to callback function for name in self._callback: # Window created event # User registered window events # Keyboard event if ( ( event_type == "onwindowcreate" and re.match(glob_trans(name), data, re.M | re.U | re.L) ) or ( event_type != "onwindowcreate" and self._callback[name][0] == event_type ) or event_type == "kbevent" ): if event_type == "kbevent": # Special case keys, modifiers = data.split("-") fname = "kbevent%s%s" % (keys, modifiers) else: fname = name # Get the callback function callback = self._callback[fname][1] if not callable(callback): # If not callable, ignore the event continue args = self._callback[fname][2] try: if len(args) and args[0]: # Spawn a new thread, for each event # If one or more arguments to the callback function _t = threading.Thread(target=callback, args=args) else: # Spawn a new thread, for each event # No arguments to the callback function _t = threading.Thread(target=callback, args=()) _t.start() except: # Log trace incase of exception log(traceback.format_exc()) # Silently ignore !?! any exception thrown pass # When multiple kb events registered, the for # loop keeps iterating, so just break the loop break return True def imagecapture( window_name=None, out_file=None, x=0, y=0, width=None, height=None ): """ Captures screenshot of the whole desktop or given window @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param x: x co-ordinate value @type x: integer @param y: y co-ordinate value @type y: integer @param width: width co-ordinate value @type width: integer @param height: height co-ordinate value @type height: integer @return: screenshot filename @rtype: string """ if not out_file: out_file = tempfile.mktemp(".png", "ldtp_") else: out_file = os.path.expanduser(out_file) ### Windows compatibility if _ldtp_windows_env: if width == None: width = -1 if height == None: height = -1 if window_name == None: window_name = "" ### Windows compatibility - End data = _remote_imagecapture(window_name, x, y, width, height) f = open(out_file, "wb") f.write(b64decode(data)) f.close() return out_file def wait(timeout=5): return _remote_wait(timeout) def waittillguiexist(window_name, object_name="", guiTimeOut=30, state=""): return _remote_waittillguiexist(window_name, object_name, guiTimeOut) def waittillguinotexist(window_name, object_name="", guiTimeOut=30, state=""): return _remote_waittillguinotexist(window_name, object_name, guiTimeOut) def guiexist(window_name, object_name=""): return _remote_guiexist(window_name, object_name) def launchapp(cmd, args=[], delay=0, env=1, lang="C"): return _remote_launchapp(cmd, args, delay, env, lang) def hasstate(window_name, object_name, state, guiTimeOut=0): return _remote_hasstate(window_name, object_name, state, guiTimeOut) def selectrow(window_name, object_name, row_text): return _remote_selectrow(window_name, object_name, row_text, False) def multiselect(window_name, object_name, row_text): return _remote_multiselect(window_name, object_name, row_text, False) def multiremove(window_name, object_name, row_text): return _remote_multiremove(window_name, object_name, row_text, False) def doesrowexist(window_name, object_name, row_text, partial_match=False): return _remote_doesrowexist( window_name, object_name, row_text, partial_match ) def getchild(window_name, child_name="", role="", parent=""): return _remote_getchild(window_name, child_name, role, parent) def enterstring(window_name, object_name="", data=""): return _remote_enterstring(window_name, object_name, data) def setvalue(window_name, object_name, data): return _remote_setvalue(window_name, object_name, float(data)) def grabfocus(window_name, object_name=""): # On Linux just with window name, grab focus doesn't work # So, we can't make this call generic return _remote_grabfocus(window_name, object_name) def copytext(window_name, object_name, start, end=-1): return _remote_copytext(window_name, object_name, start, end) def cuttext(window_name, object_name, start, end=-1): return _remote_cuttext(window_name, object_name, start, end) def deletetext(window_name, object_name, start, end=-1): return _remote_deletetext(window_name, object_name, start, end) def startprocessmonitor(process_name, interval=2): return _remote_startprocessmonitor(process_name, interval) def gettextvalue(window_name, object_name, startPosition=0, endPosition=0): return _remote_gettextvalue( window_name, object_name, startPosition, endPosition ) def getcellvalue(window_name, object_name, row_index, column=0): return _remote_getcellvalue(window_name, object_name, row_index, column) def getcellsize(window_name, object_name, row_index, column=0): return _remote_getcellsize(window_name, object_name, row_index, column) def getobjectnameatcoords(waitTime=0): # FIXME: Yet to implement in Mac, works on Windows/Linux return _remote_getobjectnameatcoords(waitTime) def generatemouseevent( x, y, event_type="b1c", drag_button_override="drag_default_button" ): return _remote_generatemouseevent(x, y, event_type, drag_button_override) def onwindowcreate(window_name, fn_name, *args): """ On window create, call the function with given arguments @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param fn_name: Callback function @type fn_name: function @param *args: arguments to be passed to the callback function @type *args: var args @return: 1 if registration was successful, 0 if not. @rtype: integer """ _pollEvents._callback[window_name] = ["onwindowcreate", fn_name, args] return _remote_onwindowcreate(window_name) def removecallback(window_name): """ Remove registered callback on window create @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @return: 1 if registration was successful, 0 if not. @rtype: integer """ if window_name in _pollEvents._callback: del _pollEvents._callback[window_name] return _remote_removecallback(window_name) def registerevent(event_name, fn_name, *args): """ Register at-spi event @param event_name: Event name in at-spi format. @type event_name: string @param fn_name: Callback function @type fn_name: function @param *args: arguments to be passed to the callback function @type *args: var args @return: 1 if registration was successful, 0 if not. @rtype: integer """ if not isinstance(event_name, str): raise ValueError("event_name should be string") _pollEvents._callback[event_name] = [event_name, fn_name, args] return _remote_registerevent(event_name) def deregisterevent(event_name): """ Remove callback of registered event @param event_name: Event name in at-spi format. @type event_name: string @return: 1 if registration was successful, 0 if not. @rtype: integer """ if event_name in _pollEvents._callback: del _pollEvents._callback[event_name] return _remote_deregisterevent(event_name) def registerkbevent(keys, modifiers, fn_name, *args): """ Register keystroke events @param keys: key to listen @type keys: string @param modifiers: control / alt combination using gtk MODIFIERS @type modifiers: int @param fn_name: Callback function @type fn_name: function @param *args: arguments to be passed to the callback function @type *args: var args @return: 1 if registration was successful, 0 if not. @rtype: integer """ event_name = "kbevent%s%s" % (keys, modifiers) _pollEvents._callback[event_name] = [event_name, fn_name, args] return _remote_registerkbevent(keys, modifiers) def deregisterkbevent(keys, modifiers): """ Remove callback of registered event @param keys: key to listen @type keys: string @param modifiers: control / alt combination using gtk MODIFIERS @type modifiers: int @return: 1 if registration was successful, 0 if not. @rtype: integer """ event_name = "kbevent%s%s" % (keys, modifiers) if event_name in _pollEvents._callback: del _pollEvents._callback[event_name] return _remote_deregisterkbevent(keys, modifiers) def windowuptime(window_name): """ Get window uptime @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @return: "starttime, endtime" as datetime python object """ tmp_time = _remote_windowuptime(window_name) if tmp_time: tmp_time = tmp_time.split("-") start_time = tmp_time[0].split(" ") end_time = tmp_time[1].split(" ") _start_time = datetime.datetime( int(start_time[0]), int(start_time[1]), int(start_time[2]), int(start_time[3]), int(start_time[4]), int(start_time[5]), ) _end_time = datetime.datetime( int(end_time[0]), int(end_time[1]), int(end_time[2]), int(end_time[3]), int(end_time[4]), int(end_time[5]), ) return _start_time, _end_time return None _populateNamespace(globals()) _pollEvents = PollEvents() _pollEvents.daemon = True _pollEvents.start() _pollLogs = PollLogs() _pollLogs.daemon = True _pollLogs.start() @atexit.register def _stop_thread(): try: _pollLogs.stop() _pollEvents.stop() except: pass atexit.register(client._client.kill_daemon) PKnN8B$%%atomacos/ldtp/client.py# Copyright (c) 2013 Nagappan Alagappan All Rights Reserved. # This file is part of ATOMac. # @author: Eitan Isaacson # @author: Nagappan Alagappan # @copyright: Copyright (c) 2009 Eitan Isaacson # @copyright: Copyright (c) 2009-13 Nagappan Alagappan # http://ldtp.freedesktop.org # ATOMac is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by the Free # Software Foundation version 2 and no later version. # ATOMac is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License version 2 # for more details. # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., 51 Franklin # St, Fifth Floor, Boston, MA 02110-1301 USA. """client routines for LDTP""" import os import sys import time import signal import platform import subprocess from socket import error as SocketError from atomacos.ldtp import logger from atomacos.ldtp.client_exception import LdtpExecutionError, ERROR_CODE try: import xmlrpclib except ImportError: import xmlrpc.client as xmlrpclib _python3 = False _python26 = False if sys.version_info[:2] <= (2, 6): _python26 = True if sys.version_info[:2] >= (3, 0): _python3 = True _ldtp_windows_env = False if "LDTP_DEBUG" in os.environ: _ldtp_debug = os.environ["LDTP_DEBUG"] else: _ldtp_debug = None if "LDTP_XML_DEBUG" in os.environ: verbose = 1 else: verbose = 0 if "LDTP_SERVER_ADDR" in os.environ: _ldtp_server_addr = os.environ["LDTP_SERVER_ADDR"] else: _ldtp_server_addr = "localhost" if "LDTP_SERVER_PORT" in os.environ: _ldtp_server_port = os.environ["LDTP_SERVER_PORT"] else: _ldtp_server_port = "4118" if "LDTP_WINDOWS" in os.environ or ( sys.platform.find("darwin") == -1 and sys.platform.find("win") != -1 ): if "LDTP_LINUX" in os.environ: _ldtp_windows_env = False else: _ldtp_windows_env = True else: _ldtp_windows_env = False class _Method(xmlrpclib._Method): def __call__(self, *args, **kwargs): if _ldtp_debug: logger.debug( "%s(%s)" % ( self.__name, ", ".join( map(repr, args) + ["%s=%s" % (k, repr(v)) for k, v in kwargs.items()] ), ) ) return self.__send(self.__name, args) class Transport(xmlrpclib.Transport): def _handle_signal(self, signum, frame): if _ldtp_debug: if signum == signal.SIGCHLD: print("ldtpd exited!") elif signum == signal.SIGUSR1: print("SIGUSR1 received. ldtpd ready for requests.") elif signum == signal.SIGALRM: print("SIGALRM received. Timeout waiting for SIGUSR1.") def _spawn_daemon(self): pid = os.getpid() if _ldtp_windows_env: if _ldtp_debug: cmd = "start cmd /K CobraWinLDTP.exe" else: cmd = "CobraWinLDTP.exe" subprocess.Popen(cmd, shell=True) self._daemon = True elif platform.mac_ver()[0] != "": pycmd = ( "import atomac.ldtpd; atomac.ldtpd.main(parentpid=%s)" % pid ) self._daemon = os.spawnlp( os.P_NOWAIT, "python", "python", "-c", pycmd ) else: pycmd = "import ldtpd; ldtpd.main(parentpid=%s)" % pid self._daemon = os.spawnlp( os.P_NOWAIT, "python", "python", "-c", pycmd ) # http://www.itkovian.net/base/transport-class-for-pythons-xml-rpc-lib/ ## # Connect to server. # # @param host Target host. # @return A connection handle. if not _python26 and not _python3: # Add to the class, only if > python 2.5 def make_connection(self, host): # create a HTTP connection object from a host descriptor import httplib host, extra_headers, x509 = self.get_host_info(host) return httplib.HTTPConnection(host) ## # Send a complete request, and parse the response. # # @param host Target host. # @param handler Target PRC handler. # @param request_body XML-RPC request body. # @param verbose Debugging flag. # @return XML response. def request(self, host, handler, request_body, verbose=0): # issue XML-RPC request retry_count = 1 while True: try: if _python26: # Noticed this in Hutlab environment (Windows 7 SP1) # Activestate python 2.5, use the old method return xmlrpclib.Transport.request( self, host, handler, request_body, verbose=verbose ) if not _python3: # Follwing implementation not supported in Python <= 2.6 h = self.make_connection(host) if verbose: h.set_debuglevel(1) self.send_request(h, handler, request_body) self.send_host(h, host) self.send_user_agent(h) self.send_content(h, request_body) else: h = self.send_request( host, handler, request_body, bool(verbose) ) response = h.getresponse() if response.status != 200: raise xmlrpclib.ProtocolError( host + handler, response.status, response.reason, response.msg.headers, ) payload = response.read() parser, unmarshaller = self.getparser() parser.feed(payload) parser.close() return unmarshaller.close() except SocketError as e: if ( (_ldtp_windows_env and e[0] == 10061) or ( hasattr(e, "errno") and (e.errno == 111 or e.errno == 61 or e.errno == 146) ) ) and "localhost" in host: if hasattr(self, "close"): # On Windows XP SP3 / Python 2.5, close doesn't exist self.close() if retry_count == 1: retry_count += 1 if not _ldtp_windows_env: sigusr1 = signal.signal( signal.SIGUSR1, self._handle_signal ) sigalrm = signal.signal( signal.SIGALRM, self._handle_signal ) sigchld = signal.signal( signal.SIGCHLD, self._handle_signal ) self._spawn_daemon() if _ldtp_windows_env: time.sleep(5) else: signal.alarm(15) # Wait 15 seconds for ldtpd signal.pause() # restore signal handlers signal.alarm(0) signal.signal(signal.SIGUSR1, sigusr1) signal.signal(signal.SIGALRM, sigalrm) signal.signal(signal.SIGCHLD, sigchld) continue else: raise # else raise exception raise except xmlrpclib.Fault as e: if hasattr(self, "close"): self.close() if e.faultCode == ERROR_CODE: raise LdtpExecutionError(e.faultString.encode("utf-8")) else: raise e def __del__(self): try: self.kill_daemon() except: # To fix https://github.com/pyatom/pyatom/issues/61 pass def kill_daemon(self): try: if _ldtp_windows_env and self._daemon: # If started by the current current, then terminate # else, silently quit subprocess.Popen( "taskkill /F /IM CobraWinLDTP.exe", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ).communicate() else: os.kill(self._daemon, signal.SIGKILL) except AttributeError: pass class LdtpClient(xmlrpclib.ServerProxy): def __init__(self, uri, encoding=None, verbose=0, use_datetime=0): super(LdtpClient, self).__init__( uri, Transport(), encoding, verbose, 1, use_datetime ) def __getattr__(self, name): # magic method dispatcher return _Method(self._ServerProxy__request, name) def kill_daemon(self): self._ServerProxy__transport.kill_daemon() def setHost(self, host): setattr(self, "_ServerProxy__host", host) _client = LdtpClient( "http://%s:%s" % (_ldtp_server_addr, _ldtp_server_port), verbose=verbose ) PKnNС11!atomacos/ldtp/client_exception.py# Copyright (c) 2013 Nagappan Alagappan All Rights Reserved. # This file is part of ATOMac. # @author: Eitan Isaacson # @author: Nagappan Alagappan # @copyright: Copyright (c) 2009 Eitan Isaacson # @copyright: Copyright (c) 2009-13 Nagappan Alagappan # http://ldtp.freedesktop.org # ATOMac is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by the Free # Software Foundation version 2 and no later version. # ATOMac is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License version 2 # for more details. # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., 51 Franklin # St, Fifth Floor, Boston, MA 02110-1301 USA. """Python LDTP exception""" ERROR_CODE = 123 class LdtpExecutionError(Exception): pass PKnNUaaatomacos/ldtp/log.py# Copyright (c) 2013 Nagappan Alagappan All Rights Reserved. # This file is part of ATOMac. # @author: Eitan Isaacson # @author: Nagappan Alagappan # @copyright: Copyright (c) 2009 Eitan Isaacson # @copyright: Copyright (c) 2009-13 Nagappan Alagappan # http://ldtp.freedesktop.org # ATOMac is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by the Free # Software Foundation version 2 and no later version. # ATOMac is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License version 2 # for more details. # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., 51 Franklin # St, Fifth Floor, Boston, MA 02110-1301 USA. """Log routines for LDTP""" from os import environ as env import logging AREA = "ldtp.client" ENV_LOG_LEVEL = "LDTP_LOG_LEVEL" ENV_LOG_OUT = "LDTP_LOG_OUT" log_level = getattr(logging, env.get(ENV_LOG_LEVEL, "NOTSET"), logging.NOTSET) logger = logging.getLogger(AREA) if ENV_LOG_OUT not in env: handler = logging.StreamHandler() handler.setFormatter( logging.Formatter("%(name)-11s %(levelname)-8s %(message)s") ) else: handler = logging.FileHandler(env[ENV_LOG_OUT]) handler.setFormatter( logging.Formatter("%(asctime)s %(levelname)-8s %(message)s") ) logger.addHandler(handler) logger.setLevel(log_level) PKnN {Ŗatomacos/ldtp/state.py# Copyright (c) 2013 Nagappan Alagappan All Rights Reserved. # This file is part of ATOMac. # @author: Nagappan Alagappan # @copyright: Copyright (c) 2009-13 Nagappan Alagappan # http://ldtp.freedesktop.org # ATOMac is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by the Free # Software Foundation version 2 and no later version. # ATOMac is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License version 2 # for more details. # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., 51 Franklin # St, Fifth Floor, Boston, MA 02110-1301 USA. """Python routines for LDTP""" ICONIFIED = "iconified" INVALID = "invalid" PRESSED = "pressed" EXPANDABLE = "expandable" VISIBLE = "visible" LAST_DEFINED = "last_defined" BUSY = "busy" EXPANDED = "expanded" MANAGES_DESCENDANTS = "manages_descendants" IS_DEFAULT = "is_default" INDETERMINATE = "indeterminate" REQUIRED = "required" TRANSIENT = "transient" CHECKED = "checked" SENSITIVE = "sensitive" COLLAPSED = "collapsed" STALE = "stale" OPAQUE = "opaque" ENABLED = "enabled" HAS_TOOLTIP = "has_tooltip" SUPPORTS_AUTOCOMPLETION = "supports_autocompletion" FOCUSABLE = "focusable" SELECTABLE = "selectable" ACTIVE = "active" HORIZONTAL = "horizontal" VISITED = "visited" INVALID_ENTRY = "invalid_entry" FOCUSED = "focused" MODAL = "modal" VERTICAL = "vertical" SELECTED = "selected" SHOWING = "showing" ANIMATED = "animated" EDITABLE = "editable" MULTI_LINE = "multi_line" SINGLE_LINE = "single_line" SELECTABLE_TEXT = "selectable_text" ARMED = "armed" DEFUNCT = "defunct" MULTISELECTABLE = "multiselectable" RESIZABLE = "resizable" TRUNCATED = "truncated" PKnN|] ] atomacos/ldtpd/__init__.py# Copyright (c) 2012 VMware, Inc. All Rights Reserved. # This file is part of ATOMac. # ATOMac is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by the Free # Software Foundation version 2 and no later version. # ATOMac is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License version 2 # for more details. # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., 51 Franklin # St, Fifth Floor, Boston, MA 02110-1301 USA. """Main routines for LDTP daemon. The LDTP daemon listens on a socket for incoming connections. These connections send XMLRPC commands to do perform various GUI automation tasks. It's the server part of a client/server UI automation architecture. """ import os import sys from . import core import time import signal import socket import threading as thread import traceback import SimpleXMLRPCServer from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler # Restrict to a particular path. class RequestHandler(SimpleXMLRPCRequestHandler): rpc_paths = ("/RPC2",) encode_threshold = None class LDTPServer(SimpleXMLRPCServer.SimpleXMLRPCServer): """Class to override some behavior in SimpleXMLRPCServer""" def server_bind(self, *args, **kwargs): """Server Bind. Forces reuse of port.""" self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Can't use super() here since SimpleXMLRPCServer is an old-style class SimpleXMLRPCServer.SimpleXMLRPCServer.server_bind( self, *args, **kwargs ) def notifyclient(parentpid): time.sleep(0.1) os.kill(int(parentpid), signal.SIGUSR1) def main(port=4118, parentpid=None): """Main entry point. Parse command line options and start up a server.""" if os.environ.has_key("LDTP_DEBUG"): _ldtp_debug = True else: _ldtp_debug = False _ldtp_debug_file = os.environ.get("LDTP_DEBUG_FILE", None) server = LDTPServer( ("", port), allow_none=True, logRequests=_ldtp_debug, requestHandler=RequestHandler, ) server.register_introspection_functions() server.register_multicall_functions() ldtp_inst = core.Core() server.register_instance(ldtp_inst) if parentpid: thread.start_new_thread(notifyclient, (parentpid,)) try: server.serve_forever() except KeyboardInterrupt: pass except: if _ldtp_debug: print(traceback.format_exc()) if _ldtp_debug_file: with open(_ldtp_debug_file, "a") as fp: fp.write(traceback.format_exc()) def __main__(): main() PKnNւ;;atomacos/ldtpd/combo_box.py# Copyright (c) 2012 VMware, Inc. All Rights Reserved. # This file is part of ATOMac. # @author: Nagappan Alagappan # @copyright: Copyright (c) 2009-12 Nagappan Alagappan # http://ldtp.freedesktop.org # ATOMac is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by the Free # Software Foundation version 2 and no later version. # ATOMac is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License version 2 # for more details. # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., 51 Franklin # St, Fifth Floor, Boston, MA 02110-1301 USA. """Combobox class.""" import re from atomac import AXKeyCodeConstants from utils import Utils from server_exception import LdtpServerException class ComboBox(Utils): def selectitem(self, window_name, object_name, item_name): """ Select combo box / layered pane item @param window_name: Window name to type in, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to type in, either full name, LDTP's name convention, or a Unix glob. @type object_name: string @param item_name: Item name to select @type object_name: string @return: 1 on success. @rtype: integer """ object_handle = self._get_object_handle(window_name, object_name) if not object_handle.AXEnabled: raise LdtpServerException( u"Object %s state disabled" % object_name ) self._grabfocus(object_handle.AXWindow) try: object_handle.Press() except AttributeError: # AXPress doesn't work with Instruments # So did the following work around x, y, width, height = self._getobjectsize(object_handle) # Mouse left click on the object # Note: x + width/2, y + height / 2 doesn't work self.generatemouseevent(x + 5, y + 5, "b1c") self.wait(5) handle = self._get_sub_menu_handle(object_handle, item_name) x, y, width, height = self._getobjectsize(handle) # on OSX 10.7 default "b1c" doesn't work # so using "b1d", verified with Fusion test, this works self.generatemouseevent(x + 5, y + 5, "b1d") return 1 # Required for menuitem to appear in accessibility list self.wait(1) menu_list = re.split(";", item_name) try: menu_handle = self._internal_menu_handler( object_handle, menu_list, True ) # Required for menuitem to appear in accessibility list self.wait(1) if not menu_handle.AXEnabled: raise LdtpServerException( u"Object %s state disabled" % menu_list[-1] ) menu_handle.Press() except LdtpServerException: object_handle.activate() object_handle.sendKey(AXKeyCodeConstants.ESCAPE) raise return 1 # Since selectitem and comboselect implementation are same, # for Linux/Windows API compatibility let us assign selectitem to comboselect comboselect = selectitem def selectindex(self, window_name, object_name, item_index): """ Select combo box item / layered pane based on index @param window_name: Window name to type in, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to type in, either full name, LDTP's name convention, or a Unix glob. @type object_name: string @param item_index: Item index to select @type object_name: integer @return: 1 on success. @rtype: integer """ object_handle = self._get_object_handle(window_name, object_name) if not object_handle.AXEnabled: raise LdtpServerException( u"Object %s state disabled" % object_name ) self._grabfocus(object_handle.AXWindow) try: object_handle.Press() except AttributeError: # AXPress doesn't work with Instruments # So did the following work around x, y, width, height = self._getobjectsize(object_handle) # Mouse left click on the object # Note: x + width/2, y + height / 2 doesn't work self.generatemouseevent(x + 5, y + 5, "b1c") # Required for menuitem to appear in accessibility list self.wait(2) if not object_handle.AXChildren: raise LdtpServerException(u"Unable to find menu") # Get AXMenu children = object_handle.AXChildren[0] if not children: raise LdtpServerException(u"Unable to find menu") children = children.AXChildren tmp_children = [] for child in children: role, label = self._ldtpize_accessible(child) # Don't add empty label # Menu separator have empty label's if label: tmp_children.append(child) children = tmp_children length = len(children) try: if item_index < 0 or item_index > length: raise LdtpServerException( u"Invalid item index %d" % item_index ) menu_handle = children[item_index] if not menu_handle.AXEnabled: raise LdtpServerException( u"Object %s state disabled" % menu_list[-1] ) self._grabfocus(menu_handle) x, y, width, height = self._getobjectsize(menu_handle) # on OSX 10.7 default "b1c" doesn't work # so using "b1d", verified with Fusion test, this works window = object_handle.AXWindow # For some reason, # self.generatemouseevent(x + 5, y + 5, "b1d") # doesn't work with Fusion settings # Advanced window, so work around with this # ldtp.selectindex('*Advanced', 'Automatic', 1) """ Traceback (most recent call last): File "build/bdist.macosx-10.8-intel/egg/atomac/ldtpd/utils.py", line 178, in _dispatch return getattr(self, method)(*args) File "build/bdist.macosx-10.8-intel/egg/atomac/ldtpd/combo_box.py", line 146, in selectindex self.generatemouseevent(x + 5, y + 5, "b1d") File "build/bdist.macosx-10.8-intel/egg/atomac/ldtpd/mouse.py", line 97, in generatemouseevent window=self._get_front_most_window() File "build/bdist.macosx-10.8-intel/egg/atomac/ldtpd/utils.py", line 185, in _get_front_most_window front_app=atomac.NativeUIElement.getFrontmostApp() File "build/bdist.macosx-10.8-intel/egg/atomac/AXClasses.py", line 114, in getFrontmostApp raise ValueError('No GUI application found.') ValueError: No GUI application found. """ window.doubleClickMouse((x + 5, y + 5)) # If menuitem already pressed, set child to None # So, it doesn't click back in combobox in finally block child = None finally: if child: child.Cancel() return 1 # Since selectindex and comboselectindex implementation are same, # for backward compatibility let us assign selectindex to comboselectindex comboselectindex = selectindex def getallitem(self, window_name, object_name): """ Get all combo box item @param window_name: Window name to type in, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to type in, either full name, LDTP's name convention, or a Unix glob. @type object_name: string @return: list of string on success. @rtype: list """ object_handle = self._get_object_handle(window_name, object_name) if not object_handle.AXEnabled: raise LdtpServerException( u"Object %s state disabled" % object_name ) object_handle.Press() # Required for menuitem to appear in accessibility list self.wait(1) child = None try: if not object_handle.AXChildren: raise LdtpServerException(u"Unable to find menu") # Get AXMenu children = object_handle.AXChildren[0] if not children: raise LdtpServerException(u"Unable to find menu") children = children.AXChildren items = [] for child in children: label = self._get_title(child) # Don't add empty label # Menu separator have empty label's if label: items.append(label) finally: if child: # Set it back, by clicking combo box child.Cancel() return items def showlist(self, window_name, object_name): """ Show combo box list / menu @param window_name: Window name to type in, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to type in, either full name, LDTP's name convention, or a Unix glob. @type object_name: string @return: 1 on success. @rtype: integer """ object_handle = self._get_object_handle(window_name, object_name) if not object_handle.AXEnabled: raise LdtpServerException( u"Object %s state disabled" % object_name ) object_handle.Press() return 1 def hidelist(self, window_name, object_name): """ Hide combo box list / menu @param window_name: Window name to type in, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to type in, either full name, LDTP's name convention, or a Unix glob. @type object_name: string @return: 1 on success. @rtype: integer """ object_handle = self._get_object_handle(window_name, object_name) object_handle.activate() object_handle.sendKey(AXKeyCodeConstants.ESCAPE) return 1 def verifydropdown(self, window_name, object_name): """ Verify drop down list / menu poped up @param window_name: Window name to type in, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to type in, either full name, LDTP's name convention, or a Unix glob. @type object_name: string @return: 1 on success 0 on failure. @rtype: integer """ try: object_handle = self._get_object_handle(window_name, object_name) if not object_handle.AXEnabled or not object_handle.AXChildren: return 0 # Get AXMenu children = object_handle.AXChildren[0] if children: return 1 except LdtpServerException: pass return 0 def verifyshowlist(self, window_name, object_name): """ Verify drop down list / menu poped up @param window_name: Window name to type in, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to type in, either full name, LDTP's name convention, or a Unix glob. @type object_name: string @return: 1 on success 0 on failure. @rtype: integer """ return self.verifydropdown(window_name, object_name) def verifyhidelist(self, window_name, object_name): """ Verify list / menu is hidden in combo box @param window_name: Window name to type in, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to type in, either full name, LDTP's name convention, or a Unix glob. @type object_name: string @return: 1 on success 0 on failure. @rtype: integer """ try: object_handle = self._get_object_handle(window_name, object_name) if not object_handle.AXEnabled: return 0 if not object_handle.AXChildren: return 1 # Get AXMenu children = object_handle.AXChildren[0] if not children: return 1 return 1 except LdtpServerException: pass return 0 def verifyselect(self, window_name, object_name, item_name): """ Verify the item selected in combo box @param window_name: Window name to type in, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to type in, either full name, LDTP's name convention, or a Unix glob. @type object_name: string @param item_name: Item name to select @type object_name: string @return: 1 on success. @rtype: integer """ try: object_handle = self._get_object_handle(window_name, object_name) if not object_handle.AXEnabled: return 0 role, label = self._ldtpize_accessible(object_handle) title = self._get_title(object_handle) if ( re.match(item_name, title, re.M | re.U | re.L) or re.match(item_name, label, re.M | re.U | re.L) or re.match( item_name, u"%u%u" % (role, label), re.M | re.U | re.L ) ): return 1 except LdtpServerException: pass return 0 def getcombovalue(self, window_name, object_name): """ Get current selected combobox value @param window_name: Window name to type in, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to type in, either full name, LDTP's name convention, or a Unix glob. @type object_name: string @return: selected item on success, else LdtpExecutionError on failure. @rtype: string """ object_handle = self._get_object_handle(window_name, object_name) if not object_handle.AXEnabled: raise LdtpServerException( u"Object %s state disabled" % object_name ) return self._get_title(object_handle) PKnNݜ]Y Y atomacos/ldtpd/constants.py# Copyright (c) 2012 VMware, Inc. All Rights Reserved. # This file is part of ATOMac. # @author: Nagappan Alagappan # @copyright: Copyright (c) 2009-12 Nagappan Alagappan # http://ldtp.freedesktop.org # ATOMac is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by the Free # Software Foundation version 2 and no later version. # ATOMac is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License version 2 # for more details. # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., 51 Franklin # St, Fifth Floor, Boston, MA 02110-1301 USA. """Constants class.""" abbreviated_roles = { "AXWindow": "frm", "AXTextArea": "txt", "AXTextField": "txt", "AXButton": "btn", "AXStaticText": "lbl", "AXRadioButton": "rbtn", "AXSlider": "sldr", "AXCell": "tblc", "AXImage": "img", "AXToolbar": "tbar", "AXScrollBar": "scbr", "AXMenuItem": "mnu", "AXMenu": "mnu", "AXMenuBar": "mnu", "AXMenuBarItem": "mnu", "AXCheckBox": "chk", "AXTabGroup": "ptl", "AXList": "lst", # Not sure what"s the following object equivalent in LDTP "AXMenuButton": "cbo", # Maybe combo-box ? "AXRow": "tblc", "AXColumn": "col", "AXTable": "tbl", "AXScrollArea": "sar", "AXOutline": "otl", "AXValueIndicator": "val", "AXDisclosureTriangle": "dct", "AXGroup": "grp", "AXPopUpButton": "pubtn", "AXApplication": "app", "AXDocItem": "doc", "AXHeading": "tch", "AXGenericElement": "gen", } ldtp_class_type = { "AXWindow": "frame", "AXApplication": "application", "AXTextArea": "text", "AXTextField": "text", "AXButton": "push_button", "AXStaticText": "label", "AXRadioButton": "radion_button", "AXSlider": "slider", "AXCell": "table_cell", "AXImage": "image", "AXToolbar": "toolbar", "AXScrollBar": "scroll_bar", "AXMenuItem": "menu_item", "AXMenu": "menu", "AXMenuBar": "menu_bar", "AXMenuBarItem": "menu_bar_item", "AXCheckBox": "check_box", "AXTabGroup": "page_tab_list", "AXList": "list", "AXColumn": "column", "AXRow": "table_cell", "AXTable": "table", "AXScrollArea": "scroll_area", "AXPopUpButton": "popup_button", "AXDocItem": "doc_item", "AXHeading": "heading", "AXGenericElement": "generic_element", } PKnNg( Batomacos/ldtpd/core.py# -*- coding: utf-8 -*- # Copyright (c) 2012 VMware, Inc. All Rights Reserved. # This file is part of ATOMac. # @author: Nagappan Alagappan # @copyright: Copyright (c) 2009-12 Nagappan Alagappan # http://ldtp.freedesktop.org # ATOMac is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by the Free # Software Foundation version 2 and no later version. # ATOMac is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License version 2 # for more details. # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., 51 Franklin # St, Fifth Floor, Boston, MA 02110-1301 USA. """Core class to be exposed via XMLRPC in LDTP daemon.""" import re import time import atomac import fnmatch import traceback from menu import Menu from text import Text from mouse import Mouse from table import Table from value import Value from generic import Generic from combo_box import ComboBox from constants import ldtp_class_type from page_tab_list import PageTabList from utils import Utils, ProcessStats from server_exception import LdtpServerException try: import psutil except ImportError: pass class Core(ComboBox, Menu, Mouse, PageTabList, Text, Table, Value, Generic): def __init__(self): super(Core, self).__init__() self._process_stats = {} def __del__(self): for key in self._process_stats.keys(): # Stop all process monitoring instances self._process_stats[key].stop() """Core LDTP class""" def appundertest(self, app_name): """ Application under test app_name: Application name should be app identifier eg: com.apple.AppleSpell', 'com.apple.talagent', 'com.apple.dock', 'com.adiumX.adiumX', 'com.apple.notificationcenterui', 'org.3rddev.xchatazure', 'com.skype.skype', 'com.mcafee.McAfeeReporter', 'com.microsoft.outlook.database_daemon', 'com.apple.photostream-agent', 'com.google.GoogleTalkPluginD', 'com.microsoft.SyncServicesAgent', 'com.google.Chrome.helper.EH', 'com.apple.dashboard.client', 'None', 'com.vmware.fusionStartMenu', 'com.apple.ImageCaptureExtension2', 'com.apple.loginwindow', 'com.mozypro.status', 'com.apple.Preview', 'com.google.Chrome.helper', 'com.apple.calculator', 'com.apple.Terminal', 'com.apple.iTunesHelper', 'com.apple.ActivityMonitor', 'net.juniper.NetworkConnect', 'com.google.Chrome', 'com.apple.dock.extra', 'com.apple.finder', 'com.yourcompany.Menulet', 'com.apple.systemuiserver' @return: return 1 on success @rtype: int """ self._app_under_test = app_name return 1 def getapplist(self): """ Get all accessibility application name that are currently running @return: list of appliction name of string type on success. @rtype: list """ app_list = [] # Update apps list, before parsing the list self._update_apps() for gui in self._running_apps: name = gui.localizedName() # default type was objc.pyobjc_unicode # convert to Unicode, else exception is thrown # TypeError: "cannot marshal objects" try: name = unicode(name) except UnicodeEncodeError: pass app_list.append(name) # Return unique application list return list(set(app_list)) def getwindowlist(self): """ Get all accessibility window that are currently open @return: list of window names in LDTP format of string type on success. @rtype: list """ return self._get_windows(True).keys() def isalive(self): """ Client will use this to verify whether the server instance is alive or not. @return: True on success. @rtype: boolean """ return True def poll_events(self): """ Poll for any registered events or window create events @return: window name @rtype: string """ if not self._callback_event: return "" return self._callback_event.pop() def getlastlog(self): """ Returns one line of log at any time, if any available, else empty string @return: log as string @rtype: string """ if not self._custom_logger.log_events: return "" return self._custom_logger.log_events.pop() def startprocessmonitor(self, process_name, interval=2): """ Start memory and CPU monitoring, with the time interval between each process scan @param process_name: Process name, ex: firefox-bin. @type process_name: string @param interval: Time interval between each process scan @type interval: double @return: 1 on success @rtype: integer """ if self._process_stats.has_key(process_name): # Stop previously running instance # At any point, only one process name can be tracked # If an instance already exist, then stop it self._process_stats[process_name].stop() # Create an instance of process stat self._process_stats[process_name] = ProcessStats( process_name, interval ) # start monitoring the process self._process_stats[process_name].start() return 1 def stopprocessmonitor(self, process_name): """ Stop memory and CPU monitoring @param process_name: Process name, ex: firefox-bin. @type process_name: string @return: 1 on success @rtype: integer """ if self._process_stats.has_key(process_name): # Stop monitoring process self._process_stats[process_name].stop() return 1 def getcpustat(self, process_name): """ get CPU stat for the give process name @param process_name: Process name, ex: firefox-bin. @type process_name: string @return: cpu stat list on success, else empty list If same process name, running multiple instance, get the stat of all the process CPU usage @rtype: list """ # Create an instance of process stat _stat_inst = ProcessStats(process_name) _stat_list = [] for p in _stat_inst.get_cpu_memory_stat(): try: _stat_list.append(p.get_cpu_percent()) except psutil.AccessDenied: pass return _stat_list def getmemorystat(self, process_name): """ get memory stat @param process_name: Process name, ex: firefox-bin. @type process_name: string @return: memory stat list on success, else empty list If same process name, running multiple instance, get the stat of all the process memory usage @rtype: list """ # Create an instance of process stat _stat_inst = ProcessStats(process_name) _stat_list = [] for p in _stat_inst.get_cpu_memory_stat(): # Memory percent returned with 17 decimal values # ex: 0.16908645629882812, round it to 2 decimal values # as 0.03 try: _stat_list.append(round(p.get_memory_percent(), 2)) except psutil.AccessDenied: pass return _stat_list def getobjectlist(self, window_name): """ Get list of items in given GUI. @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @return: list of items in LDTP naming convention. @rtype: list """ try: window_handle, name, app = self._get_window_handle( window_name, True ) object_list = self._get_appmap(window_handle, name, True) except atomac._a11y.ErrorInvalidUIElement: # During the test, when the window closed and reopened # ErrorInvalidUIElement exception will be thrown self._windows = {} # Call the method again, after updating apps window_handle, name, app = self._get_window_handle( window_name, True ) object_list = self._get_appmap(window_handle, name, True) return object_list.keys() def getobjectinfo(self, window_name, object_name): """ Get object properties. @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to look for, either full name, LDTP's name convention, or a Unix glob. @type object_name: string @return: list of properties @rtype: list """ try: obj_info = self._get_object_map( window_name, object_name, wait_for_object=False ) except atomac._a11y.ErrorInvalidUIElement: # During the test, when the window closed and reopened # ErrorInvalidUIElement exception will be thrown self._windows = {} # Call the method again, after updating apps obj_info = self._get_object_map( window_name, object_name, wait_for_object=False ) props = [] if obj_info: for obj_prop in obj_info.keys(): if not obj_info[obj_prop] or obj_prop == "obj": # Don't add object handle to the list continue props.append(obj_prop) return props def getobjectproperty(self, window_name, object_name, prop): """ Get object property value. @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to look for, either full name, LDTP's name convention, or a Unix glob. @type object_name: string @param prop: property name. @type prop: string @return: property @rtype: string """ try: obj_info = self._get_object_map( window_name, object_name, wait_for_object=False ) except atomac._a11y.ErrorInvalidUIElement: # During the test, when the window closed and reopened # ErrorInvalidUIElement exception will be thrown self._windows = {} # Call the method again, after updating apps obj_info = self._get_object_map( window_name, object_name, wait_for_object=False ) if obj_info and prop != "obj" and prop in obj_info: if prop == "class": # ldtp_class_type are compatible with Linux and Windows class name # If defined class name exist return that, # else return as it is return ldtp_class_type.get(obj_info[prop], obj_info[prop]) else: return obj_info[prop] raise LdtpServerException( 'Unknown property "%s" in %s' % (prop, object_name) ) def getchild(self, window_name, child_name="", role="", parent=""): """ Gets the list of object available in the window, which matches component name or role name or both. @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param child_name: Child name to search for. @type child_name: string @param role: role name to search for, or an empty string for wildcard. @type role: string @param parent: parent name to search for, or an empty string for wildcard. @type role: string @return: list of matched children names @rtype: list """ matches = [] if role: role = re.sub(" ", "_", role) self._windows = {} if parent and (child_name or role): _window_handle, _window_name = self._get_window_handle( window_name )[0:2] if not _window_handle: raise LdtpServerException( 'Unable to find window "%s"' % window_name ) appmap = self._get_appmap(_window_handle, _window_name) obj = self._get_object_map(window_name, parent) def _get_all_children_under_obj(obj, child_list): if role and obj["class"] == role: child_list.append(obj["label"]) elif child_name and self._match_name_to_appmap( child_name, obj ): child_list.append(obj["label"]) if obj: children = obj["children"] if not children: return child_list for child in children.split(): return _get_all_children_under_obj( appmap[child], child_list ) matches = _get_all_children_under_obj(obj, []) if not matches: if child_name: _name = 'name "%s" ' % child_name if role: _role = 'role "%s" ' % role if parent: _parent = 'parent "%s"' % parent exception = "Could not find a child %s%s%s" % ( _name, _role, _parent, ) raise LdtpServerException(exception) return matches _window_handle, _window_name = self._get_window_handle(window_name)[ 0:2 ] if not _window_handle: raise LdtpServerException( 'Unable to find window "%s"' % window_name ) appmap = self._get_appmap(_window_handle, _window_name) for name in appmap.keys(): obj = appmap[name] # When only role arg is passed if role and not child_name and obj["class"] == role: matches.append(name) # When parent and child_name arg is passed if ( parent and child_name and not role and self._match_name_to_appmap(parent, obj) ): matches.append(name) # When only child_name arg is passed if ( child_name and not role and self._match_name_to_appmap(child_name, obj) ): return name matches.append(name) # When role and child_name args are passed if ( role and child_name and obj["class"] == role and self._match_name_to_appmap(child_name, obj) ): matches.append(name) if not matches: _name = "" _role = "" _parent = "" if child_name: _name = 'name "%s" ' % child_name if role: _role = 'role "%s" ' % role if parent: _parent = 'parent "%s"' % parent exception = "Could not find a child %s%s%s" % ( _name, _role, _parent, ) raise LdtpServerException(exception) return matches def launchapp(self, cmd, args=[], delay=0, env=1, lang="C"): """ Launch application. @param cmd: Command line string to execute. @type cmd: string @param args: Arguments to the application @type args: list @param delay: Delay after the application is launched @type delay: int @param env: GNOME accessibility environment to be set or not @type env: int @param lang: Application language to be used @type lang: string @return: 1 on success @rtype: integer @raise LdtpServerException: When command fails """ try: atomac.NativeUIElement.launchAppByBundleId(cmd) return 1 except RuntimeError: if atomac.NativeUIElement.launchAppByBundlePath(cmd, args): # Let us wait so that the application launches try: time.sleep(int(delay)) except ValueError: time.sleep(5) return 1 else: raise LdtpServerException(u"Unable to find app '%s'" % cmd) def wait(self, timeout=5): """ Wait a given amount of seconds. @param timeout: Wait timeout in seconds @type timeout: double @return: 1 @rtype: integer """ time.sleep(timeout) return 1 def closewindow(self, window_name): """ Close window. @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @return: 1 on success. @rtype: integer """ return self._singleclick(window_name, "btnclosebutton") def minimizewindow(self, window_name): """ Minimize window. @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @return: 1 on success. @rtype: integer """ return self._singleclick(window_name, "btnminimizebutton") def maximizewindow(self, window_name): """ Maximize window. @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @return: 1 on success. @rtype: integer """ return self._singleclick(window_name, "btnzoombutton") def activatewindow(self, window_name): """ Activate window. @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @return: 1 on success. @rtype: integer """ window_handle = self._get_window_handle(window_name) self._grabfocus(window_handle) return 1 def click(self, window_name, object_name): """ Click item. @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to look for, either full name, LDTP's name convention, or a Unix glob. @type object_name: string @return: 1 on success. @rtype: integer """ object_handle = self._get_object_handle(window_name, object_name) if not object_handle.AXEnabled: raise LdtpServerException( u"Object %s state disabled" % object_name ) size = self._getobjectsize(object_handle) self._grabfocus(object_handle) self.wait(0.5) # If object doesn't support Press, trying clicking with the object # coordinates, where size=(x, y, width, height) # click on center of the widget # Noticed this issue on clicking AXImage # click('Instruments*', 'Automation') self.generatemouseevent( size[0] + size[2] / 2, size[1] + size[3] / 2, "b1c" ) return 1 def getallstates(self, window_name, object_name): """ Get all states of given object @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to look for, either full name, LDTP's name convention, or a Unix glob. @type object_name: string @return: list of string on success. @rtype: list """ object_handle = self._get_object_handle(window_name, object_name) _obj_states = [] if object_handle.AXEnabled: _obj_states.append("enabled") if object_handle.AXFocused: _obj_states.append("focused") else: try: if object_handle.AXFocused: _obj_states.append("focusable") except: pass if re.match( "AXCheckBox", object_handle.AXRole, re.M | re.U | re.L ) or re.match( "AXRadioButton", object_handle.AXRole, re.M | re.U | re.L ): if object_handle.AXValue: _obj_states.append("checked") return _obj_states def hasstate(self, window_name, object_name, state, guiTimeOut=0): """ has state @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to look for, either full name, LDTP's name convention, or a Unix glob. @type object_name: string @type window_name: string @param state: State of the current object. @type object_name: string @param guiTimeOut: Wait timeout in seconds @type guiTimeOut: integer @return: 1 on success. @rtype: integer """ try: object_handle = self._get_object_handle(window_name, object_name) if state == "enabled": return int(object_handle.AXEnabled) elif state == "focused": return int(object_handle.AXFocused) elif state == "focusable": return int(object_handle.AXFocused) elif state == "checked": if re.match( "AXCheckBox", object_handle.AXRole, re.M | re.U | re.L ) or re.match( "AXRadioButton", object_handle.AXRole, re.M | re.U | re.L ): if object_handle.AXValue: return 1 except: pass return 0 def getobjectsize(self, window_name, object_name=None): """ Get object size @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to look for, either full name, LDTP's name convention, or a Unix glob. Or menu heirarchy @type object_name: string @return: x, y, width, height on success. @rtype: list """ if not object_name: handle, name, app = self._get_window_handle(window_name) else: handle = self._get_object_handle(window_name, object_name) return self._getobjectsize(handle) def getwindowsize(self, window_name): """ Get window size. @param window_name: Window name to get size of. @type window_name: string @return: list of dimensions [x, y, w, h] @rtype: list """ return self.getobjectsize(window_name) def grabfocus(self, window_name, object_name=None): """ Grab focus. @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to look for, either full name, LDTP's name convention, or a Unix glob. @type object_name: string @return: 1 on success. @rtype: integer """ if not object_name: handle, name, app = self._get_window_handle(window_name) else: handle = self._get_object_handle(window_name, object_name) return self._grabfocus(handle) def guiexist(self, window_name, object_name=None): """ Checks whether a window or component exists. @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to look for, either full name, LDTP's name convention, or a Unix glob. @type object_name: string @return: 1 on success. @rtype: integer """ try: self._windows = {} if not object_name: handle, name, app = self._get_window_handle(window_name, False) else: handle = self._get_object_handle( window_name, object_name, wait_for_object=False, force_remap=True, ) # If window and/or object exist, exception will not be thrown # blindly return 1 return 1 except LdtpServerException: pass return 0 def guitimeout(self, timeout): """ Change GUI timeout period, default 30 seconds. @param timeout: timeout in seconds @type timeout: integer @return: 1 on success. @rtype: integer """ self._window_timeout = timeout return 1 def objtimeout(self, timeout): """ Change object timeout period, default 5 seconds. @param timeout: timeout in seconds @type timeout: integer @return: 1 on success. @rtype: integer """ self._obj_timeout = timeout return 1 def waittillguiexist( self, window_name, object_name="", guiTimeOut=30, state="" ): """ Wait till a window or component exists. @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to look for, either full name, LDTP's name convention, or a Unix glob. @type object_name: string @param guiTimeOut: Wait timeout in seconds @type guiTimeOut: integer @param state: Object state used only when object_name is provided. @type object_name: string @return: 1 if GUI was found, 0 if not. @rtype: integer """ timeout = 0 while timeout < guiTimeOut: if self.guiexist(window_name, object_name): return 1 # Wait 1 second before retrying time.sleep(1) timeout += 1 # Object and/or window doesn't appear within the timeout period return 0 def waittillguinotexist(self, window_name, object_name="", guiTimeOut=30): """ Wait till a window does not exist. @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to look for, either full name, LDTP's name convention, or a Unix glob. @type object_name: string @param guiTimeOut: Wait timeout in seconds @type guiTimeOut: integer @return: 1 if GUI has gone away, 0 if not. @rtype: integer """ timeout = 0 while timeout < guiTimeOut: if not self.guiexist(window_name, object_name): return 1 # Wait 1 second before retrying time.sleep(1) timeout += 1 # Object and/or window still appears within the timeout period return 0 def objectexist(self, window_name, object_name): """ Checks whether a window or component exists. @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to look for, either full name, LDTP's name convention, or a Unix glob. @type object_name: string @return: 1 if GUI was found, 0 if not. @rtype: integer """ try: object_handle = self._get_object_handle(window_name, object_name) return 1 except LdtpServerException: return 0 def stateenabled(self, window_name, object_name): """ Check whether an object state is enabled or not @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to look for, either full name, LDTP's name convention, or a Unix glob. @type object_name: string @return: 1 on success 0 on failure. @rtype: integer """ try: object_handle = self._get_object_handle(window_name, object_name) if object_handle.AXEnabled: return 1 except LdtpServerException: pass return 0 def check(self, window_name, object_name): """ Check item. @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to look for, either full name, LDTP's name convention, or a Unix glob. @type object_name: string @return: 1 on success. @rtype: integer """ # FIXME: Check for object type object_handle = self._get_object_handle(window_name, object_name) if not object_handle.AXEnabled: raise LdtpServerException( u"Object %s state disabled" % object_name ) if object_handle.AXValue == 1: # Already checked return 1 # AXPress doesn't work with Instruments # So did the following work around self._grabfocus(object_handle) x, y, width, height = self._getobjectsize(object_handle) # Mouse left click on the object # Note: x + width/2, y + height / 2 doesn't work self.generatemouseevent(x + width / 2, y + height / 2, "b1c") return 1 def uncheck(self, window_name, object_name): """ Uncheck item. @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to look for, either full name, LDTP's name convention, or a Unix glob. @type object_name: string @return: 1 on success. @rtype: integer """ object_handle = self._get_object_handle(window_name, object_name) if not object_handle.AXEnabled: raise LdtpServerException( u"Object %s state disabled" % object_name ) if object_handle.AXValue == 0: # Already unchecked return 1 # AXPress doesn't work with Instruments # So did the following work around self._grabfocus(object_handle) x, y, width, height = self._getobjectsize(object_handle) # Mouse left click on the object # Note: x + width/2, y + height / 2 doesn't work self.generatemouseevent(x + width / 2, y + height / 2, "b1c") return 1 def verifycheck(self, window_name, object_name): """ Verify check item. @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to look for, either full name, LDTP's name convention, or a Unix glob. @type object_name: string @return: 1 on success 0 on failure. @rtype: integer """ try: object_handle = self._get_object_handle( window_name, object_name, wait_for_object=False ) if object_handle.AXValue == 1: return 1 except LdtpServerException: pass return 0 def verifyuncheck(self, window_name, object_name): """ Verify uncheck item. @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to look for, either full name, LDTP's name convention, or a Unix glob. @type object_name: string @return: 1 on success 0 on failure. @rtype: integer """ try: object_handle = self._get_object_handle( window_name, object_name, wait_for_object=False ) if object_handle.AXValue == 0: return 1 except LdtpServerException: pass return 0 def getaccesskey(self, window_name, object_name): """ Get access key of given object @param window_name: Window name to look for, either full name, LDTP's name convention, or a Unix glob. @type window_name: string @param object_name: Object name to look for, either full name, LDTP's name convention, or a Unix glob. Or menu heirarchy @type object_name: string @return: access key in string format on success, else LdtpExecutionError on failure. @rtype: string """ # Used http://www.danrodney.com/mac/ as reference for # mapping keys with specific control # In Mac noticed (in accessibility inspector) only menu had access keys # so, get the menu_handle of given object and # return the access key menu_handle = self._get_menu_handle(window_name, object_name) key = menu_handle.AXMenuItemCmdChar modifiers = menu_handle.AXMenuItemCmdModifiers glpyh = menu_handle.AXMenuItemCmdGlyph virtual_key = menu_handle.AXMenuItemCmdVirtualKey modifiers_type = "" if modifiers == 0: modifiers_type = "" elif modifiers == 1: modifiers_type = "" elif modifiers == 2: modifiers_type = "