PK!#+sansio_lsp_client/__init__.py"""An implementation of the client side of the LSP protocol, useful for embedding easily in your editor.""" __version__ = "0.1.0" PK!́ sansio_lsp_client/__main__.py#!/usr/bin/env python3 # just a canvas to play in. import os import pprint import socket import urllib.request from .client import Client from .errors import IncompleteResponseError from .events import Initialized, Shutdown, ShowMessageRequest, Completion from .structs import ( CompletionTriggerKind, TextDocumentItem, TextDocumentPosition, CompletionContext, TextDocumentIdentifier, VersionedTextDocumentIdentifier, TextDocumentContentChangeEvent, TextDocumentSaveReason, Position, ) def main() -> None: sock = socket.socket() sock.connect(("localhost", int(os.environ.get("PORT", 8080)))) client = Client(trace="verbose") file_path = "./playground.py" file_uri = "file://" + urllib.request.pathname2url( os.path.abspath(file_path) ) print("File URI:", file_uri) while True: sock.sendall(client.send()) try: data = sock.recv(4096) if not data: break events = list(client.recv(data)) except IncompleteResponseError as e: continue for event in events: if isinstance(event, Initialized): print("Initialized!") print("Server capabilities:") pprint.pprint(event.capabilities) client.did_open( TextDocumentItem( uri=file_uri, languageId="python", text=open(file_path).read(), version=0, ) ) client.completions( text_document_position=TextDocumentPosition( textDocument=TextDocumentIdentifier(uri=file_uri), position=Position( line=5, character=4 + len("struct.") + 1 ), ), context=CompletionContext( triggerKind=CompletionTriggerKind.INVOKED ), ) elif isinstance(event, Shutdown): print("Shutdown and exiting") client.exit() elif isinstance(event, Completion): print("Completions:") pprint.pprint( [item.label for item in event.completion_list.items] ) client.did_close( text_document=TextDocumentIdentifier(uri=file_uri) ) client.will_save( text_document=TextDocumentIdentifier(uri=file_uri), reason=TextDocumentSaveReason.MANUAL, ) client.did_save( text_document=TextDocumentIdentifier(uri=file_uri) ) client.shutdown() client.cancel_last_request() else: raise NotImplementedError(event) if __name__ == "__main__": main() PK!yp&p&sansio_lsp_client/client.pyimport enum import typing as t import cattr from .events import ( Initialized, Completion, ServerRequest, Shutdown, Event, ShowMessage, ServerNotification, WillSaveWaitUntilEdits, ShowMessageRequest, LogMessage, ) from .structs import ( Response, TextDocumentPosition, CompletionContext, CompletionList, CompletionItem, Request, JSONDict, MessageActionItem, MessageType, TextDocumentItem, TextDocumentIdentifier, VersionedTextDocumentIdentifier, TextDocumentContentChangeEvent, TextDocumentSaveReason, ) from .io_handler import _make_request, _parse_messages, _make_response class ClientState(enum.Enum): NOT_INITIALIZED = enum.auto() WAITING_FOR_INITIALIZED = enum.auto() NORMAL = enum.auto() WAITING_FOR_SHUTDOWN = enum.auto() SHUTDOWN = enum.auto() EXITED = enum.auto() class Client: # TODO: Save the encoding given here. def __init__( self, process_id: int = None, root_uri: str = None, trace: str = "off" ) -> None: self._state = ClientState.NOT_INITIALIZED # Used to save data as it comes in (from `recieve_bytes`) until we have # a full request. self._recv_buf = bytearray() # Things that we still need to send. self._send_buf = bytearray() # Keeps track of which IDs match to which unanswered requests. self._unanswered_requests: t.Dict[int, Request] = {} # Just a simple counter to make sure we have unique IDs. We could make # sure that this fits into a JSONRPC Number, seeing as Python supports # bignums, but I think that's an unlikely enough case that checking for # it would just litter the code unnecessarily. self._id_counter = 0 # We'll just immediately send off an "initialize" request. self._send_request( method="initialize", params={ "processId": process_id, "rootUri": root_uri, "capabilities": {}, "trace": trace, }, ) self._state = ClientState.WAITING_FOR_INITIALIZED def _send_request(self, method: str, params: JSONDict = None) -> None: request = _make_request( method=method, params=params, id=self._id_counter ) self._send_buf += request self._unanswered_requests[self._id_counter] = Request( id=self._id_counter, method=method, params=params ) self._id_counter += 1 def _send_notification(self, method: str, params: JSONDict = None) -> None: self._send_buf += _make_request(method=method, params=params) def _send_response( self, id: int, result: JSONDict = None, error: JSONDict = None ) -> None: self._send_buf += _make_response(id=id, result=result, error=error) def recv(self, data: bytes) -> t.Iterator[Event]: self._recv_buf += data # We must exhaust the generator so IncompleteResponseError # is raised before we actually process anything. messages = list(_parse_messages(self._recv_buf)) # If we get here, that means the previous line didn't error out so we # can just clear whatever we were holding. self._recv_buf.clear() for message in messages: if isinstance(message, Response): response = message request = self._unanswered_requests.pop(response.id) # FIXME: The errors have meanings. if response.error is not None: __import__("pprint").pprint(response.error) raise RuntimeError("Response error!") if request.method == "initialize": assert self._state == ClientState.WAITING_FOR_INITIALIZED self._send_notification("initialized") yield cattr.structure(response.result, Initialized) self._state = ClientState.NORMAL elif request.method == "shutdown": assert self._state == ClientState.WAITING_FOR_SHUTDOWN yield Shutdown() self._state = ClientState.SHUTDOWN elif request.method == "textDocument/completion": completion_list = None try: completion_list = cattr.structure( response.result, CompletionList ) except TypeError: try: completion_list = cattr.structure( response.result, t.List[CompletionItem] ) except TypeError: assert response.result is None yield Completion(completion_list) elif request.method == "textDocument/willSaveWaitUntil": yield WillSaveWaitUntilEdits(edits=response.result) else: raise NotImplementedError((response, request)) elif isinstance(message, Request): request = message E = t.TypeVar("E", bound=Event) def structure_request(event_cls: t.Type[E]) -> E: if issubclass(event_cls, ServerRequest): event = cattr.structure(request.params, event_cls) event._id = request.id event._client = self return event elif issubclass(event_cls, ServerNotification): return cattr.structure(request.params, event_cls) else: raise TypeError( "`event_cls` must be a subclass of ServerRequest or ServerNotification" ) if request.method == "window/showMessage": yield structure_request(ShowMessage) elif request.method == "window/showMessageRequest": yield structure_request(ShowMessageRequest) elif request.method == "window/logMessage": yield structure_request(LogMessage) else: raise NotImplementedError(request) else: raise RuntimeError("nobody will ever see this, i hope") def send(self) -> bytes: send_buf = self._send_buf[:] self._send_buf.clear() return send_buf def shutdown(self) -> None: assert self._state == ClientState.NORMAL self._send_request(method="shutdown") self._state = ClientState.WAITING_FOR_SHUTDOWN def exit(self) -> None: assert self._state == ClientState.SHUTDOWN self._send_notification(method="exit") self._state = ClientState.EXITED def cancel_last_request(self) -> None: self._send_notification( method="$/cancelRequest", params={"id": self._id_counter - 1} ) def did_open(self, text_document: TextDocumentItem) -> None: assert self._state == ClientState.NORMAL self._send_notification( method="textDocument/didOpen", params={"textDocument": cattr.unstructure(text_document)}, ) def did_change( self, text_document: VersionedTextDocumentIdentifier, content_changes: t.List[TextDocumentContentChangeEvent], ) -> None: assert self._state == ClientState.NORMAL self._send_notification( method="textDocument/didChange", params={ "textDocument": cattr.unstructure(text_document), "contentChanges": cattr.unstructure(content_changes), }, ) def will_save( self, text_document: TextDocumentIdentifier, reason: TextDocumentSaveReason, ) -> None: assert self._state == ClientState.NORMAL self._send_notification( method="textDocument/willSave", params={ "textDocument": cattr.unstructure(text_document), "reason": cattr.unstructure(reason), }, ) def will_save_wait_until( self, text_document: TextDocumentIdentifier, reason: TextDocumentSaveReason, ) -> None: assert self._state == ClientState.NORMAL self._send_request( method="textDocument/willSaveWaitUntil", params={ "textDocument": cattr.unstructure(text_document), "reason": cattr.unstructure(reason), }, ) def did_save( self, text_document: TextDocumentIdentifier, text: str = None ) -> None: assert self._state == ClientState.NORMAL params = ({"textDocument": cattr.unstructure(text_document)},) if text is not None: params["text"] = text self._send_notification(method="textDocument/didClose", params=params) def did_close(self, text_document: TextDocumentIdentifier) -> None: assert self._state == ClientState.NORMAL self._send_notification( method="textDocument/didClose", params={"textDocument": cattr.unstructure(text_document)}, ) def completions( self, text_document_position: TextDocumentPosition, context: CompletionContext = None, ) -> None: assert self._state == ClientState.NORMAL params = {} params.update(cattr.unstructure(text_document_position)) if context is not None: params.update(cattr.unstructure(context)) self._send_request(method="textDocument/completion", params=params) PK!{433sansio_lsp_client/errors.pyclass IncompleteResponseError(Exception): pass PK!)sansio_lsp_client/events.pyimport typing as t import attr from attr import attrs, attrib from .structs import ( JSONDict, MessageType, MessageActionItem, CompletionItem, CompletionList, TextEdit, ) @attrs class Event: pass @attrs class ServerRequest(Event): _client: "Client" = attrib(init=False) _id: int = attrib(init=False) @attrs class ServerNotification(Event): pass @attrs class Initialized(Event): capabilities: JSONDict = attrib() @attrs class Shutdown(Event): pass @attrs class ShowMessage(ServerNotification): type: MessageType = attrib() message: str = attrib() @attrs class ShowMessageRequest(ServerRequest): type: MessageType = attrib() message: str = attrib() actions: t.Optional[t.List[MessageActionItem]] = attrib() def reply(self, action: MessageActionItem = None) -> None: """ Reply to the ShowMessageRequest with the user's selection. No bytes are actually returned from this method, the reply's bytes are added to the client's internal send buffer. """ self._client._send_response(id=self._id, result=attr.asdict(action)) @attrs class LogMessage(ServerNotification): type: MessageType = attrib() message: str = attrib() @attrs class Completion: completion_list: t.Union[ CompletionList, t.List[CompletionItem], None ] = attrib() # XXX: not sure how to name this event. @attrs class WillSaveWaitUntilEdits: edits: t.Optional[t.List[TextEdit]] = attrib() PK!%^^sansio_lsp_client/io_handler.pyimport cgi import json import typing as t import cattr from .structs import Request, Response, JSONDict from .errors import IncompleteResponseError def _make_headers(content_length: int, encoding: str = "utf-8") -> bytes: headers_bytes = bytearray() headers = { "Content-Length": content_length, "Content-Type": f"application/vscode-jsonrpc; charset={encoding}", } for (key, value) in headers.items(): headers_bytes += f"{key}: {value}\r\n".encode(encoding) headers_bytes += b"\r\n" return headers_bytes def _make_request( method: str, params: JSONDict = None, id: int = None, *, encoding: str = "utf-8", ) -> bytes: request = bytearray() # Set up the actual JSONRPC content and encode it. content: JSONDict = {"jsonrpc": "2.0", "method": method} if params is not None: content["params"] = params if id is not None: content["id"] = id encoded_content = json.dumps(content).encode(encoding) # Write the headers to the request body request += _make_headers( content_length=len(encoded_content), encoding=encoding ) # Append the content to the request request += encoded_content return request def _make_response( id: int, result: JSONDict = None, error: JSONDict = None, *, encoding: str = "utf-8", ) -> bytes: request = bytearray() # Set up the actual JSONRPC content and encode it. content: JSONDict = {"jsonrpc": "2.0", "id": id} if result is not None: content["result"] = result if error is not None: content["error"] = error encoded_content = json.dumps(content).encode(encoding) # Write the headers to the request body request += _make_headers( content_length=len(encoded_content), encoding=encoding ) # Append the content to the request request += encoded_content return request def _parse_messages(response: bytes) -> t.Iterator[t.Union[Response, Request]]: if b"\r\n\r\n" not in response: raise IncompleteResponseError("Incomplete headers") header_lines, raw_content = response.split(b"\r\n\r\n", 1) # Parse the headers. headers = {} for header_line in header_lines.split(b"\r\n"): key, value = header_line.decode("ascii").split(": ", 1) headers[key] = value # We will now parse the Content-Type and Content-Length headers. Since for # version 3.0 of the Language Server Protocol they're the only ones, we can # just verify they're there and not keep them around in the Response # object. assert set(headers.keys()) == {"Content-Type", "Content-Length"} # Content-Type and encoding. content_type, metadata = cgi.parse_header(headers["Content-Type"]) assert content_type == "application/vscode-jsonrpc" encoding = metadata["charset"] # Content-Length content_length = int(headers["Content-Length"]) # We need to verify that the raw_content is long enough, seeing as we might # be getting an incomplete request. if len(raw_content) < content_length: raise IncompleteResponseError( "Not enough bytes to fulfill Content-Length requirements." ) # Take only as many bytes as we need. If there's any remaining, they're # the next response's. raw_content, next_response = ( raw_content[:content_length], raw_content[content_length:], ) def do_it(data: JSONDict) -> t.Union[Response, Request]: del data["jsonrpc"] try: response = cattr.structure(data, Response) return response except TypeError: pass try: request = cattr.structure(data, Request) return request except TypeError: pass raise RuntimeError(f"{data!r} is neither a Request nor a Response!") content = json.loads(raw_content.decode(encoding)) if isinstance(content, list): # This is in response to a batch operation. yield from map(do_it, content) else: yield do_it(content) if next_response: yield from _parse_messages(next_response) PK!c9sansio_lsp_client/structs.pyimport enum import typing as t from attr import attrs, attrib # XXX: Replace the non-commented-out code with what's commented out once nested # types become a thing in mypy. # JSONValue = t.Union[None, str, int, t.List['JSONValue'], t.Dict[str, 'JSONValue']] # JSONDict = t.Dict[str, JSONValue] JSONDict = t.Dict[str, t.Any] # XXX: We can't have this be both str and int due to `cattrs` restrictions. How # can we fix this? # Id = t.Union[str, int] Id = int @attrs class Request: method: str = attrib() id: t.Optional[Id] = attrib(default=None) params: t.Optional[JSONDict] = attrib(default=None) @attrs class Response: id: t.Optional[Id] = attrib(default=None) result: t.Optional[JSONDict] = attrib(default=None) error: t.Optional[JSONDict] = attrib(default=None) class MessageType(enum.IntEnum): ERROR = 1 WARNING = 2 INFO = 3 LOG = 4 @attrs class MessageActionItem: title: str = attrib() @attrs class TextDocumentItem: uri: str = attrib() languageId: str = attrib() version: int = attrib() text: str = attrib() @attrs class TextDocumentIdentifier: uri: str = attrib() @attrs class VersionedTextDocumentIdentifier(TextDocumentIdentifier): version: t.Optional[int] = attrib(default=None) @attrs class Position: # NB: These are both zero-based. line: int = attrib() character: int = attrib() @attrs class Range: start: Position = attrib() end: Position = attrib() def __len__(self) -> int: raise NotImplementedError( "i as a developer am too stupid to figure out how to implement this" ) @attrs class TextDocumentContentChangeEvent: text: str = attrib() range: t.Optional[Range] = attrib(default=None) rangeLength: t.Optional[int] = attrib(default=None) def change_range( cls, change_start: Position, change_end: Position, change_text: str ) -> "TextDocumentContentChangeEvent": change_range = Range(change_start, change_end) return cls( range=change_range, rangeLength=len(change_range), text=change_text ) def change_whole_document( cls, change_text: str ) -> "TextDocumentContentChangeEvent": return cls(text=change_text) @attrs class TextDocumentPosition: textDocument: TextDocumentIdentifier = attrib() position: Position = attrib() class CompletionTriggerKind(enum.IntEnum): INVOKED = 1 TRIGGER_CHARACTER = 2 TRIGGER_FOR_INCOMPLETE_COMPLETIONS = 3 @attrs class CompletionContext: triggerKind: CompletionTriggerKind = attrib() triggerCharacter: t.Optional[str] = attrib(default=None) class MarkupKind(enum.Enum): PLAINTEXT = "plaintext" MARKDOWN = "markdown" @attrs class MarkupContent: kind: MarkupKind = attrib() value: str = attrib() @attrs class TextEdit: range: Range = attrib() newText: str = attrib() @attrs class Command: title: str = attrib() command: str = attrib() arguments: t.Optional[t.List[t.Any]] = attrib(default=None) class InsertTextFormat(enum.IntEnum): PLAIN_TEXT = 1 SNIPPET = 2 @attrs class CompletionItem: label: str = attrib() # TODO: implement CompletionItemKind. kind: t.Optional[int] = attrib(default=None) detail: t.Optional[str] = attrib(default=None) # FIXME: Allow `t.Union[str, MarkupContent]` here by defining a cattrs custom # loads hook. documentation: t.Optional[str] = attrib(default=None) deprecated: t.Optional[bool] = attrib(default=None) preselect: t.Optional[bool] = attrib(default=None) sortText: t.Optional[str] = attrib(default=None) filterText: t.Optional[str] = attrib(default=None) insertText: t.Optional[str] = attrib(default=None) insertTextFormat: t.Optional[InsertTextFormat] = attrib(default=None) textEdit: t.Optional[TextEdit] = attrib(default=None) additionalTextEdits: t.Optional[t.List[TextEdit]] = attrib(default=None) commitCharacters: t.Optional[t.List[str]] = attrib(default=None) command: t.Optional[Command] = attrib(default=None) data: t.Optional[t.Any] = attrib(default=None) @attrs class CompletionList: isIncomplete: bool = attrib() items: t.List[CompletionItem] = attrib() class TextDocumentSaveReason(enum.IntEnum): MANUAL = 1 AFTER_DELAY = 2 FOCUS_OUT = 3 PK!H_zTT'sansio_lsp_client-0.1.0.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]n0H*J>mlcAPK!HOZ<;*sansio_lsp_client-0.1.0.dist-info/METADATAKK@+R >B[,ئXO<<{'!vpaVE-H:2:+B( Ot]RIB JyS J 0@"$|$4w{8xZnJ2ra$4Ƃb]n#iM5yc9{2 N[\? ̓Cl-A%HAqm,v-](,5NM=H51 y]{_Қ yKg >d?+ϺGr>up1f<M;^3 JyG}PK!H5-M=(sansio_lsp_client-0.1.0.dist-info/RECORDK@ph txʣm`~Nɮᰗꢦ+[;Jp5;BeS+_oͧj7` ĪH33OmJu -*ol)n,k2|եnnfɬҧ8?$C%v:`X\#ܒ"3ئdǧt/&5QIj'#8CP炈ټ~trXZ1>ٳϯ*z-8 [R&'BbfF2V@ħIqF^\i=,ҙHZ 9@ v;yz2$ߚ͹t>kaz.f=7B^h`P<'p?QYJˮ&o߃Mvtf=`XG&!_ԊPsBI))M#!YS޻s_^pS?^pPK!#+sansio_lsp_client/__init__.pyPK!́ sansio_lsp_client/__main__.pyPK!yp&p& sansio_lsp_client/client.pyPK!{4333sansio_lsp_client/errors.pyPK!)3sansio_lsp_client/events.pyPK!%^^ :sansio_lsp_client/io_handler.pyPK!c9Jsansio_lsp_client/structs.pyPK!H_zTT'[sansio_lsp_client-0.1.0.dist-info/WHEELPK!HOZ<;*y\sansio_lsp_client-0.1.0.dist-info/METADATAPK!H5-M=(]sansio_lsp_client-0.1.0.dist-info/RECORDPK )`