PK!++dataland/__init__.pyfrom ._meta import __version__, __author__ PK!,^CCdataland/_meta.py__author__ = "Ryan " __version__ = "0.1.4" PK!1d d dataland/base.pyimport os from typing import Any, Union import attr @attr.s(frozen=True, auto_attribs=True) class DataUri: basename: str = attr.ib(converter=str) prefix: str = attr.ib(converter=str) @property def fullpath(self): return os.path.join(self.prefix, self.basename) @attr.s(frozen=True, auto_attribs=True) class DataPath(DataUri): pass @attr.s(frozen=True, auto_attribs=True) class DataTable: dataset: str tablename: str query: str = "" class DataFormat: TEXT = "txt" JSON = "json" PARQUET = "parquet" CSV = "csv" @attr.s(frozen=True, auto_attribs=True) class Data: name: str source: Union[DataUri, DataTable] schema: Any = None format: str = DataFormat.JSON # noqa description: str = "" def to_dict(self): return attr.asdict(self) class DataNode: def __init__( self, name: str, source: Union[DataUri, DataTable], schema: Any = None, format: str = DataFormat.JSON, description: str = "", ): self._data = Data( name=name, source=source, schema=schema, format=format, description=description, ) @property def data(self) -> Data: return self._data def load(self): raise NotImplementedError() def dump(self): raise NotImplementedError() def __repr__(self): return f"DataNode.{repr(self._data)}" def __eq__(self, other: "DataNode"): return self.data == other.data and self.__class__ == other.__class__ class DataLink: def __init__( self, delegate_cls: DataNode, name: str, source: Union[DataUri, DataTable], link: Union[DataUri, DataTable], schema: Any = None, format: str = DataFormat.JSON, description: str = "", ): self._node = delegate_cls( name=name, source=source, schema=schema, format=format, description=description, ) self._link = delegate_cls( name=name, source=link, schema=schema, format=format, description=description, ) @property def node(self) -> DataNode: return self._node @property def link(self) -> DataNode: return self._link PK! @0IIdataland/gcloud/README.md# Api wrappers for Google Cloud Client ## Storage ## Dataproc ## TODO PK!dataland/gcloud/__init__.pyPK!M77$dataland/gcloud/bigquery/__init__.pyfrom .base import BqDataNode __all__ = ['BqDataNode'] PK!j4X X dataland/gcloud/bigquery/base.pyimport time from typing import Any, List, Union from google.cloud import bigquery from ...base import DataNode class BqDataNode(DataNode): def load(self): raise NotImplementedError() def dump( self, uris: Union[str, List[str]], schema: Any = None, destination: Any = None, create_disposition: str = "CREATE_IF_NEEDED", write_disposition: str = "WRITE_TRUNCATE", ) -> bigquery.LoadJob: client = bigquery.Client() dataset_ref = client.dataset(self.data.source.dataset) table_ref = dataset_ref.table(self.data.source.tablename) job_config = bigquery.LoadJobConfig() job_config.source_format = self.data.format job_config.schema = schema or self.data.schema job_config.create_disposition = create_disposition job_config.write_disposition = write_disposition is_valid_urls = isinstance(uris, str) or ( isinstance(uris, List) and all(isinstance(uri, str) for uri in uris) ) if not is_valid_urls: raise ValueError("`uris` contains invalid values") load_job = client.load_table_from_uri( uris, destination=destination or table_ref, job_config=job_config ) # API request # elif isinstance(pdf, pd.DataFrame): # load_job = client.load_table_from_dataframe( # pdf, destination=destination or table_ref, job_config=job_config # ) # API request return load_job def copy_to( self, dest_node: "BqDataNode", create_disposition: str = "CREATE_IF_NEEDED", write_disposition: str = "WRITE_TRUNCATE", ) -> bigquery.CopyJob: client = bigquery.Client() src_ref = client.dataset(self.data.source.dataset).table( self.data.source.tablename ) # yapf: disable dest_ref = client.dataset(dest_node.data.source.dataset).table( dest_node.data.source.tablename ) # yapf: disable job_config = bigquery.CopyJobConfig() job_config.create_disposition = create_disposition job_config.write_disposition = write_disposition copy_job = client.copy_table(src_ref, dest_ref, job_config=job_config) return copy_job @classmethod def wait( cls, job_id: str, target_state: str = "DONE", interval: int = 10, timeout: int = 600, verbose: bool = True, ) -> bigquery.job._AsyncJob: tic = time.time() client = bigquery.Client() while True: try: job: bigquery.job._AsyncJob = client.get_job(job_id) toc = time.time() if verbose: print( ( f'[BQ-JOB] "{job.state}" >> "{target_state}",' f" {toc - tic:.3f} seconds elapsed" ), flush=True, ) if job.state == target_state: break if (toc - tic) > timeout: raise TimeoutError( f"bigquery operation reaches time out: {timeout} seconds" ) time.sleep(interval) except Exception as err: raise err return job PK!v88$dataland/gcloud/dataproc/__init__.pyfrom .api import DataprocApi __all__ = ['DataprocApi'] PK!LZ!dataland/gcloud/dataproc/_data.pyimport copy import json from typing import Any, Dict, List, Optional import attr from cattr import structure_attrs_fromdict attrs = attr.s attrib = attr.ib class _DataMixin: def to_dict(self, **kwargs) -> Dict: return attr.asdict(self, **kwargs) def to_json(self, **kwargs) -> str: return json.dumps(self.to_dict(), **kwargs) @attrs(slots=True, frozen=True, auto_attribs=True) class _Status: state: str innerState: str stateStartTime: str @attrs(slots=True, frozen=True, auto_attribs=True) class _Meta: _at_type: str clusterName: str clusterUuid: str operationType: str description: str status: _Status @attrs(slots=True, frozen=True, auto_attribs=True) class Operation: """[Reference](https://cloud.google.com/dataproc/docs/reference/rest/Shared.Types/ListOperationsResponse#Operation)""" name: str metadata: _Meta done: Optional[bool] = None result: Any = None @classmethod def from_dataproc_api(cls, incoming: Dict): cloned: Dict = copy.deepcopy(incoming) cloned["metadata"]["at_type"] = cloned["metadata"].pop("@type") return structure_attrs_fromdict(cloned, cls) @attrs(slots=True, auto_attribs=True) class _DiskConfig: bootDiskSizeGb: int = 1000 bootDiskType: str = "pd-standard" numLocalSsds: int = 0 @attrs(slots=True, auto_attribs=True) class _MachineConfig: numInstances: int = 1 machineTypeUri: str = "n1-standard-4" diskConfig: _DiskConfig = attrib(factory=_DiskConfig) instanceNames: List[str] = attrib(factory=list) imageUri: str = "" @attrs(slots=True, auto_attribs=True) class _GceCusterConfig: zoneUri: str = "" subnetworkUri: str = "default" # serviceAccountScopes: List[str] # TODO @attrs(slots=True, auto_attribs=True) class _InitAction: executableFile: str = "" @attrs(slots=True, auto_attribs=True) class _SoftwareConfig: imageVersion: str = "" properties: Dict = attrib(factory=dict) @attrs(slots=True, auto_attribs=True) class _Cluster: configBucket: str = "" gceClusterConfig: _GceCusterConfig = attrib(factory=_GceCusterConfig) masterConfig: _MachineConfig = attrib(factory=_MachineConfig) workerConfig: _MachineConfig = attrib(factory=_MachineConfig) initializationActions: List[_InitAction] = attrib(factory=list) # softwareConfig: _SoftwareConfig @attrs(slots=True, auto_attribs=True) class DataprocSpec(_DataMixin): """[Reference](https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.clusters#Cluster)""" projectId: str clusterName: str config: _Cluster = attrib(factory=_Cluster) @attrs(slots=True, auto_attribs=True) class _PysparkJob(_DataMixin): mainPythonFileUri: str = "" args: List[str] = attrib(factory=list) pythonFileUris: List[str] = attrib(factory=list) jarFileUris: List[str] = attrib(factory=list) fileUris: List[str] = attrib(factory=list) archiveUris: List[str] = attrib(factory=list) properties: Dict[str, str] = attrib(factory=dict) # loggingConfig: Any @attrs(slots=True, auto_attribs=True) class _JobPlace: clusterName: str = "" clusterUuid: str = "" @attrs(slots=True, auto_attribs=True) class _JobRef: projectId: str = "" jobId: str = "" @attrs(slots=True, auto_attribs=True) class _Job: pysparkJob: _PysparkJob = attrib(factory=_PysparkJob) placement: _JobPlace = attrib(factory=_JobPlace) reference: _JobRef = attrib(factory=_JobRef) @attrs(slots=True, auto_attribs=True) class JobSpec: """[Reference](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs#Job)""" job: _Job = attrib(factory=_Job) requestId: str = "" PK!Xdq22dataland/gcloud/dataproc/api.pyimport time from typing import Callable, Dict, Iterable, List from box import Box from googleapiclient import discovery class _Api: def __init__(self, project_id: str, region: str): self.project_id = project_id self.region = region class _Jobs(_Api): @property def clusters(self) -> "_Clusters": return self._api_root.clusters def __init__(self, api_root, **kwargs): super().__init__(**kwargs) self._jobs = api_root._dataproc.projects().regions().jobs() self._api_root = api_root def lz_submit(self, body: Dict, callbacks: Iterable[Callable] = None): def _submit(): results = [] submitted = self.submit(body).execute() results.append(submitted) if callbacks: if isinstance(callbacks, Callable): results.append(callbacks(submitted)) elif isinstance(callbacks, Iterable): for cb in callbacks: results.append(cb(submitted)) else: raise ValueError( f"`callbacks` should be Iterable[Callable], got: {callbacks}" ) return results self._api_root._requests.append(_submit) return self def lz_wait( self, job_id: str, target_state: str, interval: int = 60, timeout: int = 36000, verbose: bool = True, error_states: List[str] = ["ERROR", "CANCELLED"], ): def _wait(): return self.wait( job_id, target_state, interval=interval, timeout=timeout, verbose=verbose, error_states=error_states, ) self._api_root._requests.append(_wait) return self def cancel(self): # TODO pass def delete(self): pass def get(self, job_id: str): return self._jobs.get( projectId=self.project_id, region=self.region, jobId=job_id ) def list(self): # TODO pass def submit(self, body: Dict): return self._jobs.submit( projectId=self.project_id, region=self.region, body=body ) def update(self): # TODO pass def wait( self, job_id: str, target_state: str, interval: int = 60, timeout: int = 36000, verbose: bool = True, error_states: List[str] = ["ERROR", "CANCELLED"], ): """Wait for job to reach some status states. Params: `job_id`: specify the target job to be monitored `target_state`: state string returned by `jobs.get` api, wait util the target cluster reaches to this state, ex: we expect it will eventally reach "DONE" state while submitting a new job. `timeout`: interrupt waiting and raise `TimeoutError` after elapsed time exceeds this threshold, default: 36000 (10hr) `interval`: waiting interval between consecutive api requests, in seconds `error_states`: interrupt waiting and raise `RuntimeError` while job's status reaches these states """ tic = time.time() # TODO: set maximum timeout while True: try: request = self.get(job_id) status = Box(request.execute()["status"]) toc = time.time() if verbose: print( ( f'[JOB] "{status.state}" >> "{target_state}",' f" {toc - tic:.3f} seconds elapsed" ), flush=True, ) if status.state == target_state: break elif status.state in error_states: raise RuntimeError( f'Unexpected "{status.state}" state, Please check the log of the job on Dataproc browser for details, job_id: "{job_id}"' ) if (toc - tic) > timeout: raise TimeoutError( f"Job execution reaches time out: {timeout} seconds" ) time.sleep(interval) except Exception as err: raise err def execute(self) -> List: """Execute requests sequentially in pipeline""" return self._api_root.execute() class _Clusters(_Api): @property def jobs(self) -> _Jobs: return self._api_root.jobs def __init__(self, api_root, **kwargs): super().__init__(**kwargs) self._clusters = api_root._dataproc.projects().regions().clusters() self._api_root = api_root def reset_requests(self): self._api_root.reset_requests() return self def lz_create(self, body: Dict, callbacks: Iterable[Callable] = None): self._api_root._requests.append(self.create(body, callbacks).execute) return self def lz_wait( self, cluster_name: str, target_state: str, interval: int = 5, fail_ok: bool = False, verbose: bool = True, ): def _wait(): return self.wait( cluster_name, target_state, interval=interval, fail_ok=fail_ok, verbose=verbose, ) self._api_root._requests.append(_wait) return self def lz_get(self, cluster_name: str): self._api_root._requests.append(self.get(cluster_name).execute) return self def lz_delete(self, cluster_name: str, callbacks: Iterable[Callable] = None): self._api_root._requests.append(self.delete(cluster_name, callbacks).execute) return self def execute(self) -> List: """Execute requests sequentially in pipeline""" return self._api_root.execute() def create(self, body: Dict, callbacks: Iterable[Callable] = None): request = self._clusters.create( projectId=self.project_id, region=self.region, body=body ) if callbacks: if isinstance(callbacks, Callable): request.add_response_callback(callbacks) elif isinstance(callbacks, Iterable): for cb in callbacks: request.add_response_callback(cb) else: raise ValueError( f"`callbacks` should be Iterable[Callable], got: {callbacks}" ) return request def delete(self, cluster_name: str, callbacks: Iterable[Callable] = None): request = self._clusters.delete( projectId=self.project_id, region=self.region, clusterName=cluster_name ) if callbacks: if isinstance(callbacks, Callable): request.add_response_callback(callbacks) elif isinstance(callbacks, Iterable): for cb in callbacks: request.add_response_callback(cb) else: raise ValueError( f"`callbacks` should be Iterable[Callable], got: {callbacks}" ) return request def diagnose(self): # TODO pass def get(self, cluster_name: str): return self._clusters.get( projectId=self.project_id, region=self.region, clusterName=cluster_name ) def list(self): # TODO pass def patch(self): # TODO pass def wait( self, cluster_name: str, target_state: str, interval: int = 10, timeout: int = 300, fail_ok: bool = False, verbose: bool = True, ) -> None: """Wait for dataproc cluster to reach target status Params: `cluster_name`: get the target cluster to be monitored `target_state`: state string return by `clusters.get` api, wait util the target cluster reaches to this state, ex: we expect it will eventally reach "RUNNING" state while creating a new cluster. `interval`: waiting interval between consecutive api requests, in seconds. `timeout`: interrupt waiting and raise `TimeoutError` after elapsed time exceeds this threshold, default: 300 (5min) `fail_ok`: ignore the request exception, set to `True` when we expect cluster is about to die. """ tic = time.time() # TODO: set maximum timeout while True: try: request = self.get(cluster_name) status = Box(request.execute()["status"]) toc = time.time() if verbose: print( ( f'[CLUSTER] "{status.state}" >> "{target_state}",' f" {toc - tic:.3f} seconds elapsed" ), flush=True, ) if status.state == target_state: break if status.state == "ERROR": raise RuntimeError( f'Unexpected "{status.state}" state.' f" Please check the log of the cluster on Dataproc browser for details," f' cluster_name: "{cluster_name}"' ) if (toc - tic) > timeout: raise TimeoutError( f"Cluster execution reaches time out: {timeout} seconds" ) time.sleep(interval) except Exception as err: if fail_ok: break else: raise err class DataprocApi(_Api): _apis = {} def __init__( self, project_id: str, region: str, api_version: str = "v1", credentials=None ): """Dataproc api wraper. Params: `project_id`: mapping to `projectId` parameter in dataproc api `region`: mapping to `region` parameter in dataproc api """ super().__init__(project_id, region) creds = credentials or self._default_credentials dataproc = discovery.build("dataproc", api_version, credentials=creds) self._credentials = creds self._dataproc = dataproc self._version = api_version self._namespace = (self._version, self._credentials) if self._namespace not in DataprocApi._apis: DataprocApi._apis[self._namespace] = {} self._requests = [] self._responses = {} def reset_requests(self): self._requests = [] def get_response(self): pass @property def _default_credentials(self): from google.auth import _default return _default.default()[0] @property def clusters(self) -> _Clusters: if "_clusters" not in DataprocApi._apis[self._namespace]: DataprocApi._apis[self._namespace]["_clusters"] = _Clusters( self, project_id=self.project_id, region=self.region ) return DataprocApi._apis[self._namespace]["_clusters"] @property def jobs(self): if "_jobs" not in DataprocApi._apis[self._namespace]: DataprocApi._apis[self._namespace]["_jobs"] = _Jobs( self, project_id=self.project_id, region=self.region ) return DataprocApi._apis[self._namespace]["_jobs"] def execute(self) -> List: """Execute requests sequentially in pipeline""" results = [] if isinstance(self._requests, Iterable): for ex in self._requests: if not isinstance(ex, Callable): raise TypeError( f"All `executable` objects in request-pipeline should be callable, got: {ex}" ) while len(self._requests) > 0: execute = self._requests.pop(0) results.append(execute()) return results def create(self, body: Dict, callbacks: Iterable[Callable] = None): request = self._clusters.create( projectId=self.project_id, region=self.region, body=body ) if callbacks: if isinstance(callbacks, Callable): request.add_response_callback(callbacks) elif isinstance(callbacks, Iterable): for cb in callbacks: request.add_response_callback(cb) else: raise ValueError( f"`callbacks` should be Iterable[Callable], got: {callbacks}" ) PK!99#dataland/gcloud/storage/__init__.pyfrom .base import GcsDataNode __all__ = ['GcsDataNode'] PK!  dataland/gcloud/storage/base.pyimport os import re import tempfile from typing import Tuple from google.cloud import storage from ...base import DataNode from ...utils import deprecated class GcsDataNode(DataNode): @deprecated def _to_tempfile(self, content: str) -> str: with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: f.write(content) abs_path = f.name return abs_path def _get_blob_meta(self, gs_uri: str) -> Tuple[str, str]: """Extract `bucketname` and `blob_path` from an gs-uri Return: tuple: (bucketname, blob_path) """ try: matched = re.search(r"gs:\/\/(?P[^:\/ ]+)\/(.+)", gs_uri) bucketname, blob_path = matched.group(1), matched.group(2) except Exception as err: print( ( "Invalid URI value, should match the pattern:" fr' `gs:\/\/(?P[^:\/ ]+)\/(.+)`, got: "{gs_uri}"' ) ) raise err return (bucketname, blob_path) def _get_blob(self, gs_uri: str) -> storage.Blob: """Retrieve blob object by `google.cloud.storage` api""" bucketname, blob_path = self._get_blob_meta(gs_uri) client = storage.Client() bucket = client.get_bucket(bucketname) blob = bucket.blob(blob_path) return blob def copy_to(self, dest_node: "GcsDataNode"): from google.cloud import storage client = storage.Client() src: str = self.data.source.fullpath dest: str = dest_node.data.source.fullpath src_bucket_name, src_blob_path = self._get_blob_meta(src) dest_bucket_name, dest_blob_path = self._get_blob_meta(dest) src_bucket = client.get_bucket(src_bucket_name) dest_bucket = client.get_bucket(dest_bucket_name) src_blob = src_bucket.blob(src_blob_path) src_bucket.copy_blob(src_blob, dest_bucket, new_name=dest_blob_path) def load(self, fullpath: str = "", mode: str = "wb") -> str: """Download data from gcs to local temp file by `google.cloud.storage` api Params: mode: mode for operating temp files, default: 'w'. fullpath: full file path on gcs, should starts with `gs://...`, optional. """ fullpath: str = fullpath or self.data.source.fullpath blob = self._get_blob(fullpath) with tempfile.NamedTemporaryFile(mode=mode, delete=False) as f: blob.download_to_file(f) abs_path = f.name return abs_path def dump( self, path: str = None, content: str = "", fullpath: str = "", mode: str = "w" ) -> None: """Dump data to gcs through `google.cloud.storage` api. Params: bucket_name: to construct client api of google cloud storage service. path: dump data from a source(local) file path. NOTE: Instead of dumping directory, archive the directory into a single file, then dump. mode: mode for operating temp files, default: 'w'. content: if local `path` is not provided, dump content into tempfile, then dump to gcs. fullpath: full file path on gcs, should starts with `gs://...`, optional. Return: None """ # If `path` is provided, ignore `content` parameter. if path: if not os.path.isfile(path): raise ValueError(f'Invalid file `path`, got: "{path}"') else: with tempfile.NamedTemporaryFile(mode=mode, delete=False) as f: f.write(content) path = f.name fullpath: str = fullpath or self.data.source.fullpath blob = self._get_blob(fullpath) blob.upload_from_filename(path) PK!ndataland/nlp/__init__.pyfrom ._string import String from .utils import halfwidth, normalize, split_and_join, unigram __all__ = ["String", "halfwidth", "normalize", "unigram", "split_and_join"] PK!.qe e dataland/nlp/_string.pyfrom functools import wraps from typing import Any, Callable, Union import regex from ..utils import qualname class String: """Alternative type for `str`, make it more `fluent` for nlp processing, the biggest benefits are `replace` and `apply`. `replace`: Support pattern replacement with `regex`, which is more powerful then builtin's `re` package `apply`: Define arbitrary function to process string Examples: >>> text = String('abcd') >>> text.replace(r'[ac]', lambda x: x.group(0).upper()).to_str() 'AbCd' >>> text.upper().startswith('A') # NOTE: return boolean True >>> text.apply(lambda x: 'A' * len(x)).to_str() 'AAAA' """ @property def value(self) -> str: return self._value def __init__(self, string): self._value = string def _wrap(self, func: Callable[[Any], str], *args, **kwargs) -> Callable: @wraps(func) def _inner(*args, **kwargs): return self.apply(func, *args, **kwargs) return _inner def __getattr__(self, name: str, *args, **kwargs): func = getattr(str, name, None) if callable(func): return self._wrap(func, *args, **kwargs) raise AttributeError(f"`String` object has no attribute `{name}`") def replace(self, pattern: str, repl: str, **kwargs) -> "String": """Relace the string by pattern match Return the string obtained by replacing the leftmost (or rightmost with a reverse pattern) non-overlapping occurrences of the `pattern` in string by the replacement `repl` Args: pattern (str): Pattern to match in the string, can be regular expression repl (str): Can be either a string or a callable; if a string, backslash escapes in it are processed; if a callable, it's passed the match object and must return a replacement string to be used. **kwargs: Other keyword arguments fed into `regex.sub` Return: replaced string (String) References: regex: https://pypi.org/project/regex/ """ return self.__class__(regex.sub(pattern, repl, self._value, **kwargs)) def apply( self, func: Callable, *args, allow_any: bool = True, **kwargs ) -> Union["String", Any]: """Apply arbitrary function to process string Args: func (Callable): function to apply allow_any (bool): If set to False, raise `TypeError` when the return value of the function is not a `str` Returns: processed string (String) """ r = func(self._value, *args, **kwargs) if isinstance(r, str): return self.__class__(r) elif allow_any: return r else: raise TypeError(f"`{qualname(func)}` doesn't return `str`") def to_str(self) -> str: """Convert `String` back to `str`.""" return self._value def __str__(self): return self.to_str() def __repr__(self): return f"" PK!%dataland/nlp/utils.pyimport unicodedata import regex from zhon import hanzi, zhuyin from ..utils import deprecated ZH_PATTERNS = rf"[{hanzi.characters+zhuyin.characters}]" def split_and_join(text: str, splitter: str = "", joiner: str = "") -> str: """Split string first, then join back to a new string by specified delimiter.""" return joiner.join(text.split(splitter or None)) def halfwidth(text: str, form="NFKD") -> str: """Convert the string to halfwidth""" return unicodedata.normalize(form, text) @deprecated(replaced_by=halfwidth) def _halfwidth(text: str) -> str: """Convert the string to halfwidth. full-width characters' unicodes range from 65281 to 65374 (0xFF01 - 0xFF5E in hex) half-width characters' unicodes range from 33 to 126 (0x21 - 0x7E in hex) `space` in full-width: 12288(0x3000), in half-width: 32(0x20) since the unicode difference is fixed between full- and half-width forms of single character, convert the character to half-width by numeric shifting, and handle `space` as a special case """ rstring = "" for char in text: code = ord(char) if code == 0x3000: code = 0x0020 else: code -= 0xFEE0 if code < 0x0020 or code > 0x7E: # fallback check rstring += char else: rstring += chr(code) return rstring def normalize(text: str, form="NFKD") -> str: """Get the normal form form for the Unicode string `unistr`.""" return regex.sub(r"\p{M}", "", halfwidth(text, form)) def unigram(text: str) -> str: """Separate each [`Han` character](https://en.wikipedia.org/wiki/CJK_characters) with space character in sentences Args: text (str) Returns: str Examples: >>> unigram("abc一二三cde efg四五六") 'abc 一 二 三 cde efg 四 五 六' """ return split_and_join( regex.sub(r"({})".format(ZH_PATTERNS), r" \1 ", text), joiner=" " ) PK!靐dataland/spark.pyimport os import types from functools import wraps from typing import List, Optional, Union import pyspark.sql.functions as F from pyspark import SparkConf from pyspark.sql import DataFrame, SparkSession from pyspark.sql.types import StringType from .base import DataNode class SparkDFNode(DataNode): def _default_spark_config(self, **configs): scfg = SparkConf() user = os.environ.get("USER", "") scfg.set( "spark.sql.warehouse.dir", os.path.join(f"/tmp/{user}", "spark-warehouse") ) for k, v in configs.items(): scfg.set(k, v) return scfg def load( self, fullpaths: Union[str, List[str]] = None, ss: Optional[SparkSession] = None, scfg: Optional[SparkConf] = None, **read_options, ) -> DataFrame: if ss is None: cfg = dict(scfg.getAll()) if isinstance(scfg, SparkConf) else {} scfg = self._default_spark_config(**cfg) ss = SparkSession.builder.config(conf=scfg).getOrCreate() if "format" not in read_options: read_options["format"] = self.data.format if "schema" not in read_options: read_options["schema"] = self.data.schema return ss.read.load(fullpaths or self.data.source.fullpath, **read_options) def dump(self, sdf: DataFrame, destination=None, **write_options) -> None: """""" if "mode" not in write_options: write_options["mode"] = "overwrite" if "format" not in write_options: write_options["format"] = self.data.format sdf.write.save(destination or self.data.source.fullpath, **write_options) def _copy_func(f, name: str = ""): """Copy a function object. Args: name (str): Give the copied function a new name, or use the original function's name Returns: Copied function object """ fn = types.FunctionType( f.__code__, f.__globals__, name or f.__name__, f.__defaults__, f.__closure__ ) # In case f was given attrs (note this dict is a shallow copy): fn.__dict__.update(f.__dict__) return fn def udfy(fn=None, return_type=StringType()): """A wrapper to provide additional spark udf interface to arbitrary python function. In python, invoke function by `my_function(a, b)`, then in spark context, invoke udf by `my_function.udf('col-a', 'col-b')`, additionally, keyword arguments can be overrided by "dynamic udf interface", e.g., invoke `my_function.dudf(some_keyword=other_value)('col-a', 'col-b')` Args: fn (FunctionType): function to wrap return_type (DataType): the spark data type the wrapped function should return Examples: >>> @udfy(return_type=FloatType()) def foo(val, offset=2, mult=3): return float(val * mult + offset) >>> foo(8) 26.0 >>> sdf = spark.range(10) >>> sdf.show(3) +---+ | id| +---+ | 0| | 1| | 2| +---+ only showing top 3 rows >>> sdf.select(foo.udf('id')).show(3) +-------+ |foo(id)| +-------+ | 2.0| | 5.0| | 8.0| +-------+ only showing top 3 rows >>> sdf.select(foo.dudf(mult=11, offset=-5)('id')).show(3) +-------+ |foo(id)| +-------+ | -5.0| | 6.0| | 17.0| +-------+ only showing top 3 rows Returns: Wrapped function object """ def wrap(f): @wraps(f) def outer(**kwargs): @wraps(f) def inner(*args): return f(*args, **kwargs) return F.udf(inner, returnType=return_type) # Use some hacky deepcopy trick to avoid `AssertionError` while pickling fcopied = _copy_func(f) fcopied.udf = outer() fcopied.dudf = outer return fcopied if fn is None: return wrap else: return wrap(fn) PK!godataland/utils.pyimport os from functools import wraps from typing import Dict def qualname(obj, default: str = "", level: int = 1) -> str: module = getattr(obj, "__module__", default) name = getattr(obj, "__qualname__", default) joined = ".".join([nm for nm in [module, name] if nm]) return ".".join(joined.split(".")[-level:]) def deprecated(cls_or_fn=None, replaced_by=None): """A Wrapper to mark any implementations(classes or functions) as deprecated""" def wrapper(_callable): @wraps(_callable) def inner(*args, **kwargs): replacement = ( f", and will be replaced by: `{qualname(replaced_by)}`" if replaced_by else "" ) print(f"\n[WARNING] `{qualname(_callable)}` is deprecated{replacement}\n") return _callable(*args, **kwargs) return inner if cls_or_fn: return wrapper(cls_or_fn) else: return wrapper class SimpleTemplate: __slots__ = ("_template", "_default_params") @classmethod def join_from( cls, other: "SimpleTemplate", template: str, sep: str = os.path.sep, **defaut_params: str, ) -> "SimpleTemplate": return cls( sep.join([other.template, template]), **dict(**other.default_params, **defaut_params), ) def __init__(self, template: str, **default_params): self._template = template self._default_params = default_params @property def template(self) -> str: return self._template @property def default_params(self) -> Dict: return self._default_params def __call__(self, **params: str): populates = {k: params.get(k, v) for k, v in self.default_params.items()} return self.template.format(**populates) def to_str(self) -> str: return str(self) def __str__(self): return self.template.format(**self.default_params) def __repr__(self): return f"" PK!OUU dataland-0.1.4.dist-info/LICENSEMIT License Copyright (c) Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice (including the next paragraph) shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!H$TTdataland-0.1.4.dist-info/WHEEL = 0 нR \C HCoo~ \B"|lchлYh|hzWٓ7}|v }PK!H϶O!dataland-0.1.4.dist-info/METADATASn0+xlԱ'h4v3-mdV!wrF.gvuW`[ %dVu,>k #" =1v6h_m;m Gtv!/OoT*,/?'}ܠ/&G]KTm4,W ]r !f 4??+t(i̽긓e).Ey|$|csmĕn|3~TUx9}1XJxu~ɹ& >?q bc!՝I+\1lN+k(tʤA>2nz nbYh4=_$Fm3f,!y :DBVjGeJJ#1 rN%S* ;c;P{s?Vj83*;}C:X lѷsC}`LDkEbuoN_M