PK!sansio_lsp_client/__init__.py"""An implementation of the client side of the LSP protocol, useful for embedding easily in your editor.""" from .client import Client from .events import * from .structs import * __version__ = "0.2.2" PK!́ sansio_lsp_client/__main__.py#!/usr/bin/env python3 # just a canvas to play in. import os import pprint import socket import urllib.request from .client import Client from .errors import IncompleteResponseError from .events import Initialized, Shutdown, ShowMessageRequest, Completion from .structs import ( CompletionTriggerKind, TextDocumentItem, TextDocumentPosition, CompletionContext, TextDocumentIdentifier, VersionedTextDocumentIdentifier, TextDocumentContentChangeEvent, TextDocumentSaveReason, Position, ) def main() -> None: sock = socket.socket() sock.connect(("localhost", int(os.environ.get("PORT", 8080)))) client = Client(trace="verbose") file_path = "./playground.py" file_uri = "file://" + urllib.request.pathname2url( os.path.abspath(file_path) ) print("File URI:", file_uri) while True: sock.sendall(client.send()) try: data = sock.recv(4096) if not data: break events = list(client.recv(data)) except IncompleteResponseError as e: continue for event in events: if isinstance(event, Initialized): print("Initialized!") print("Server capabilities:") pprint.pprint(event.capabilities) client.did_open( TextDocumentItem( uri=file_uri, languageId="python", text=open(file_path).read(), version=0, ) ) client.completions( text_document_position=TextDocumentPosition( textDocument=TextDocumentIdentifier(uri=file_uri), position=Position( line=5, character=4 + len("struct.") + 1 ), ), context=CompletionContext( triggerKind=CompletionTriggerKind.INVOKED ), ) elif isinstance(event, Shutdown): print("Shutdown and exiting") client.exit() elif isinstance(event, Completion): print("Completions:") pprint.pprint( [item.label for item in event.completion_list.items] ) client.did_close( text_document=TextDocumentIdentifier(uri=file_uri) ) client.will_save( text_document=TextDocumentIdentifier(uri=file_uri), reason=TextDocumentSaveReason.MANUAL, ) client.did_save( text_document=TextDocumentIdentifier(uri=file_uri) ) client.shutdown() client.cancel_last_request() else: raise NotImplementedError(event) if __name__ == "__main__": main() PK!,0 l&l&sansio_lsp_client/client.pyimport enum import typing as t import cattr from .events import ( Initialized, Completion, ServerRequest, Shutdown, Event, ShowMessage, ServerNotification, WillSaveWaitUntilEdits, ShowMessageRequest, LogMessage, ) from .structs import ( Response, TextDocumentPosition, CompletionContext, CompletionList, CompletionItem, Request, JSONDict, MessageActionItem, MessageType, TextDocumentItem, TextDocumentIdentifier, VersionedTextDocumentIdentifier, TextDocumentContentChangeEvent, TextDocumentSaveReason, ) from .io_handler import _make_request, _parse_messages, _make_response class ClientState(enum.Enum): NOT_INITIALIZED = enum.auto() WAITING_FOR_INITIALIZED = enum.auto() NORMAL = enum.auto() WAITING_FOR_SHUTDOWN = enum.auto() SHUTDOWN = enum.auto() EXITED = enum.auto() class Client: # TODO: Save the encoding given here. def __init__( self, process_id: int = None, root_uri: str = None, trace: str = "off" ) -> None: self._state = ClientState.NOT_INITIALIZED # Used to save data as it comes in (from `recieve_bytes`) until we have # a full request. self._recv_buf = bytearray() # Things that we still need to send. self._send_buf = bytearray() # Keeps track of which IDs match to which unanswered requests. self._unanswered_requests: t.Dict[int, Request] = {} # Just a simple counter to make sure we have unique IDs. We could make # sure that this fits into a JSONRPC Number, seeing as Python supports # bignums, but I think that's an unlikely enough case that checking for # it would just litter the code unnecessarily. self._id_counter = 0 # We'll just immediately send off an "initialize" request. self._send_request( method="initialize", params={ "processId": process_id, "rootUri": root_uri, "capabilities": {}, "trace": trace, }, ) self._state = ClientState.WAITING_FOR_INITIALIZED def _send_request(self, method: str, params: JSONDict = None) -> None: request = _make_request( method=method, params=params, id=self._id_counter ) self._send_buf += request self._unanswered_requests[self._id_counter] = Request( id=self._id_counter, method=method, params=params ) self._id_counter += 1 def _send_notification(self, method: str, params: JSONDict = None) -> None: self._send_buf += _make_request(method=method, params=params) def _send_response( self, id: int, result: JSONDict = None, error: JSONDict = None ) -> None: self._send_buf += _make_response(id=id, result=result, error=error) def recv(self, data: bytes) -> t.Iterator[Event]: self._recv_buf += data # We must exhaust the generator so IncompleteResponseError # is raised before we actually process anything. messages = list(_parse_messages(self._recv_buf)) # If we get here, that means the previous line didn't error out so we # can just clear whatever we were holding. self._recv_buf.clear() for message in messages: if isinstance(message, Response): response = message request = self._unanswered_requests.pop(response.id) # FIXME: The errors have meanings. if response.error is not None: __import__("pprint").pprint(response.error) raise RuntimeError("Response error!") if request.method == "initialize": assert self._state == ClientState.WAITING_FOR_INITIALIZED self._send_notification("initialized") yield cattr.structure(response.result, Initialized) self._state = ClientState.NORMAL elif request.method == "shutdown": assert self._state == ClientState.WAITING_FOR_SHUTDOWN yield Shutdown() self._state = ClientState.SHUTDOWN elif request.method == "textDocument/completion": completion_list = None try: completion_list = cattr.structure( response.result, CompletionList ) except TypeError: try: completion_list = cattr.structure( response.result, t.List[CompletionItem] ) except TypeError: assert response.result is None yield Completion(completion_list) elif request.method == "textDocument/willSaveWaitUntil": yield WillSaveWaitUntilEdits(edits=response.result) else: raise NotImplementedError((response, request)) elif isinstance(message, Request): request = message E = t.TypeVar("E", bound=Event) def structure_request(event_cls: t.Type[E]) -> E: if issubclass(event_cls, ServerRequest): event = cattr.structure(request.params, event_cls) event._id = request.id event._client = self return event elif issubclass(event_cls, ServerNotification): return cattr.structure(request.params, event_cls) else: raise TypeError( "`event_cls` must be a subclass of ServerRequest or ServerNotification" ) if request.method == "window/showMessage": yield structure_request(ShowMessage) elif request.method == "window/showMessageRequest": yield structure_request(ShowMessageRequest) elif request.method == "window/logMessage": yield structure_request(LogMessage) else: raise NotImplementedError(request) else: raise RuntimeError("nobody will ever see this, i hope") def send(self) -> bytes: send_buf = self._send_buf[:] self._send_buf.clear() return send_buf def shutdown(self) -> None: assert self._state == ClientState.NORMAL self._send_request(method="shutdown") self._state = ClientState.WAITING_FOR_SHUTDOWN def exit(self) -> None: assert self._state == ClientState.SHUTDOWN self._send_notification(method="exit") self._state = ClientState.EXITED def cancel_last_request(self) -> None: self._send_notification( method="$/cancelRequest", params={"id": self._id_counter - 1} ) def did_open(self, text_document: TextDocumentItem) -> None: assert self._state == ClientState.NORMAL self._send_notification( method="textDocument/didOpen", params={"textDocument": cattr.unstructure(text_document)}, ) def did_change( self, text_document: VersionedTextDocumentIdentifier, content_changes: t.List[TextDocumentContentChangeEvent], ) -> None: assert self._state == ClientState.NORMAL self._send_notification( method="textDocument/didChange", params={ "textDocument": cattr.unstructure(text_document), "contentChanges": cattr.unstructure(content_changes), }, ) def will_save( self, text_document: TextDocumentIdentifier, reason: TextDocumentSaveReason, ) -> None: assert self._state == ClientState.NORMAL self._send_notification( method="textDocument/willSave", params={ "textDocument": cattr.unstructure(text_document), "reason": cattr.unstructure(reason), }, ) def will_save_wait_until( self, text_document: TextDocumentIdentifier, reason: TextDocumentSaveReason, ) -> None: assert self._state == ClientState.NORMAL self._send_request( method="textDocument/willSaveWaitUntil", params={ "textDocument": cattr.unstructure(text_document), "reason": cattr.unstructure(reason), }, ) def did_save( self, text_document: TextDocumentIdentifier, text: str = None ) -> None: assert self._state == ClientState.NORMAL params = {"textDocument": cattr.unstructure(text_document)} if text is not None: params["text"] = text self._send_notification(method="textDocument/didSave", params=params) def did_close(self, text_document: TextDocumentIdentifier) -> None: assert self._state == ClientState.NORMAL self._send_notification( method="textDocument/didClose", params={"textDocument": cattr.unstructure(text_document)}, ) def completions( self, text_document_position: TextDocumentPosition, context: CompletionContext = None, ) -> None: assert self._state == ClientState.NORMAL params = {} params.update(cattr.unstructure(text_document_position)) if context is not None: params.update(cattr.unstructure(context)) self._send_request(method="textDocument/completion", params=params) PK!{433sansio_lsp_client/errors.pyclass IncompleteResponseError(Exception): pass PK!)sansio_lsp_client/events.pyimport typing as t import attr from attr import attrs, attrib from .structs import ( JSONDict, MessageType, MessageActionItem, CompletionItem, CompletionList, TextEdit, ) @attrs class Event: pass @attrs class ServerRequest(Event): _client: "Client" = attrib(init=False) _id: int = attrib(init=False) @attrs class ServerNotification(Event): pass @attrs class Initialized(Event): capabilities: JSONDict = attrib() @attrs class Shutdown(Event): pass @attrs class ShowMessage(ServerNotification): type: MessageType = attrib() message: str = attrib() @attrs class ShowMessageRequest(ServerRequest): type: MessageType = attrib() message: str = attrib() actions: t.Optional[t.List[MessageActionItem]] = attrib() def reply(self, action: MessageActionItem = None) -> None: """ Reply to the ShowMessageRequest with the user's selection. No bytes are actually returned from this method, the reply's bytes are added to the client's internal send buffer. """ self._client._send_response(id=self._id, result=attr.asdict(action)) @attrs class LogMessage(ServerNotification): type: MessageType = attrib() message: str = attrib() @attrs class Completion: completion_list: t.Union[ CompletionList, t.List[CompletionItem], None ] = attrib() # XXX: not sure how to name this event. @attrs class WillSaveWaitUntilEdits: edits: t.Optional[t.List[TextEdit]] = attrib() PK!%^^sansio_lsp_client/io_handler.pyimport cgi import json import typing as t import cattr from .structs import Request, Response, JSONDict from .errors import IncompleteResponseError def _make_headers(content_length: int, encoding: str = "utf-8") -> bytes: headers_bytes = bytearray() headers = { "Content-Length": content_length, "Content-Type": f"application/vscode-jsonrpc; charset={encoding}", } for (key, value) in headers.items(): headers_bytes += f"{key}: {value}\r\n".encode(encoding) headers_bytes += b"\r\n" return headers_bytes def _make_request( method: str, params: JSONDict = None, id: int = None, *, encoding: str = "utf-8", ) -> bytes: request = bytearray() # Set up the actual JSONRPC content and encode it. content: JSONDict = {"jsonrpc": "2.0", "method": method} if params is not None: content["params"] = params if id is not None: content["id"] = id encoded_content = json.dumps(content).encode(encoding) # Write the headers to the request body request += _make_headers( content_length=len(encoded_content), encoding=encoding ) # Append the content to the request request += encoded_content return request def _make_response( id: int, result: JSONDict = None, error: JSONDict = None, *, encoding: str = "utf-8", ) -> bytes: request = bytearray() # Set up the actual JSONRPC content and encode it. content: JSONDict = {"jsonrpc": "2.0", "id": id} if result is not None: content["result"] = result if error is not None: content["error"] = error encoded_content = json.dumps(content).encode(encoding) # Write the headers to the request body request += _make_headers( content_length=len(encoded_content), encoding=encoding ) # Append the content to the request request += encoded_content return request def _parse_messages(response: bytes) -> t.Iterator[t.Union[Response, Request]]: if b"\r\n\r\n" not in response: raise IncompleteResponseError("Incomplete headers") header_lines, raw_content = response.split(b"\r\n\r\n", 1) # Parse the headers. headers = {} for header_line in header_lines.split(b"\r\n"): key, value = header_line.decode("ascii").split(": ", 1) headers[key] = value # We will now parse the Content-Type and Content-Length headers. Since for # version 3.0 of the Language Server Protocol they're the only ones, we can # just verify they're there and not keep them around in the Response # object. assert set(headers.keys()) == {"Content-Type", "Content-Length"} # Content-Type and encoding. content_type, metadata = cgi.parse_header(headers["Content-Type"]) assert content_type == "application/vscode-jsonrpc" encoding = metadata["charset"] # Content-Length content_length = int(headers["Content-Length"]) # We need to verify that the raw_content is long enough, seeing as we might # be getting an incomplete request. if len(raw_content) < content_length: raise IncompleteResponseError( "Not enough bytes to fulfill Content-Length requirements." ) # Take only as many bytes as we need. If there's any remaining, they're # the next response's. raw_content, next_response = ( raw_content[:content_length], raw_content[content_length:], ) def do_it(data: JSONDict) -> t.Union[Response, Request]: del data["jsonrpc"] try: response = cattr.structure(data, Response) return response except TypeError: pass try: request = cattr.structure(data, Request) return request except TypeError: pass raise RuntimeError(f"{data!r} is neither a Request nor a Response!") content = json.loads(raw_content.decode(encoding)) if isinstance(content, list): # This is in response to a batch operation. yield from map(do_it, content) else: yield do_it(content) if next_response: yield from _parse_messages(next_response) PK!c9sansio_lsp_client/structs.pyimport enum import typing as t from attr import attrs, attrib # XXX: Replace the non-commented-out code with what's commented out once nested # types become a thing in mypy. # JSONValue = t.Union[None, str, int, t.List['JSONValue'], t.Dict[str, 'JSONValue']] # JSONDict = t.Dict[str, JSONValue] JSONDict = t.Dict[str, t.Any] # XXX: We can't have this be both str and int due to `cattrs` restrictions. How # can we fix this? # Id = t.Union[str, int] Id = int @attrs class Request: method: str = attrib() id: t.Optional[Id] = attrib(default=None) params: t.Optional[JSONDict] = attrib(default=None) @attrs class Response: id: t.Optional[Id] = attrib(default=None) result: t.Optional[JSONDict] = attrib(default=None) error: t.Optional[JSONDict] = attrib(default=None) class MessageType(enum.IntEnum): ERROR = 1 WARNING = 2 INFO = 3 LOG = 4 @attrs class MessageActionItem: title: str = attrib() @attrs class TextDocumentItem: uri: str = attrib() languageId: str = attrib() version: int = attrib() text: str = attrib() @attrs class TextDocumentIdentifier: uri: str = attrib() @attrs class VersionedTextDocumentIdentifier(TextDocumentIdentifier): version: t.Optional[int] = attrib(default=None) @attrs class Position: # NB: These are both zero-based. line: int = attrib() character: int = attrib() @attrs class Range: start: Position = attrib() end: Position = attrib() def __len__(self) -> int: raise NotImplementedError( "i as a developer am too stupid to figure out how to implement this" ) @attrs class TextDocumentContentChangeEvent: text: str = attrib() range: t.Optional[Range] = attrib(default=None) rangeLength: t.Optional[int] = attrib(default=None) def change_range( cls, change_start: Position, change_end: Position, change_text: str ) -> "TextDocumentContentChangeEvent": change_range = Range(change_start, change_end) return cls( range=change_range, rangeLength=len(change_range), text=change_text ) def change_whole_document( cls, change_text: str ) -> "TextDocumentContentChangeEvent": return cls(text=change_text) @attrs class TextDocumentPosition: textDocument: TextDocumentIdentifier = attrib() position: Position = attrib() class CompletionTriggerKind(enum.IntEnum): INVOKED = 1 TRIGGER_CHARACTER = 2 TRIGGER_FOR_INCOMPLETE_COMPLETIONS = 3 @attrs class CompletionContext: triggerKind: CompletionTriggerKind = attrib() triggerCharacter: t.Optional[str] = attrib(default=None) class MarkupKind(enum.Enum): PLAINTEXT = "plaintext" MARKDOWN = "markdown" @attrs class MarkupContent: kind: MarkupKind = attrib() value: str = attrib() @attrs class TextEdit: range: Range = attrib() newText: str = attrib() @attrs class Command: title: str = attrib() command: str = attrib() arguments: t.Optional[t.List[t.Any]] = attrib(default=None) class InsertTextFormat(enum.IntEnum): PLAIN_TEXT = 1 SNIPPET = 2 @attrs class CompletionItem: label: str = attrib() # TODO: implement CompletionItemKind. kind: t.Optional[int] = attrib(default=None) detail: t.Optional[str] = attrib(default=None) # FIXME: Allow `t.Union[str, MarkupContent]` here by defining a cattrs custom # loads hook. documentation: t.Optional[str] = attrib(default=None) deprecated: t.Optional[bool] = attrib(default=None) preselect: t.Optional[bool] = attrib(default=None) sortText: t.Optional[str] = attrib(default=None) filterText: t.Optional[str] = attrib(default=None) insertText: t.Optional[str] = attrib(default=None) insertTextFormat: t.Optional[InsertTextFormat] = attrib(default=None) textEdit: t.Optional[TextEdit] = attrib(default=None) additionalTextEdits: t.Optional[t.List[TextEdit]] = attrib(default=None) commitCharacters: t.Optional[t.List[str]] = attrib(default=None) command: t.Optional[Command] = attrib(default=None) data: t.Optional[t.Any] = attrib(default=None) @attrs class CompletionList: isIncomplete: bool = attrib() items: t.List[CompletionItem] = attrib() class TextDocumentSaveReason(enum.IntEnum): MANUAL = 1 AFTER_DELAY = 2 FOCUS_OUT = 3 PK!.i%++)sansio_lsp_client-0.2.2.dist-info/LICENSEMIT License Copyright (c) 2018 PurpleMyst Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!H_zTT'sansio_lsp_client-0.2.2.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]n0H*J>mlcAPK!H,^(*sansio_lsp_client-0.2.2.dist-info/METADATA[O0+F 6NHDXCQw˪n٬ouP)[2s0NEU@b]sYob\.C.GQ%r.6ZEC 6T [;Xw85ѷ 1n'u@;|"NGOR|L1pSt+[ou 8s/uquu#3\\FGwOqVULq G|ǟIr5X~F1| 7Wy-؍sjaYIY;iyCƯ?z.5TptvZ,5?;d]KtcS\x7CȯW,s:xB"śDdOPK!H&<%(sansio_lsp_client-0.2.2.dist-info/RECORDˎ@}? t%,fZ*FMO?L:cYL*9ۤnI%j1 :pmH U7%S> jW Ǧd)oi v$A~i'*!7^IR t/UyYy^ M<."{`6}}%pBqH>]?H9Mp(mhdڻ:f0 |9t[tk ƃE"N, ۙz9l'"4٘)Jj='*0a~nyRˌ>0 !H>ZZWtN"qkE,jSnhwv @%t͵&N>ѷ+T^(Q@?Q^x>ߑ'yrv+ y^*kVi]|ڕ^-(78Q !_л;qQӕ QJ=Q-Pb9 )8pDy6_}&G߻7tܱl/T;yU?sG丗?PK!sansio_lsp_client/__init__.pyPK!́ sansio_lsp_client/__main__.pyPK!,0 l&l& sansio_lsp_client/client.pyPK!{4333sansio_lsp_client/errors.pyPK!)04sansio_lsp_client/events.pyPK!%^^O:sansio_lsp_client/io_handler.pyPK!c9Jsansio_lsp_client/structs.pyPK!.i%++)%\sansio_lsp_client-0.2.2.dist-info/LICENSEPK!H_zTT'`sansio_lsp_client-0.2.2.dist-info/WHEELPK!H,^(*0asansio_lsp_client-0.2.2.dist-info/METADATAPK!H&<%(csansio_lsp_client-0.2.2.dist-info/RECORDPK b^e