Source code for graphvite.application.application

# Copyright 2019 MilaGraph. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Author: Zhaocheng Zhu

"""Implementation of applications"""
from __future__ import print_function, absolute_import, unicode_literals, division

import pickle
import logging
import multiprocessing
from collections import defaultdict

from future.builtins import str, map
from easydict import EasyDict
import numpy as np

from .. import dtype, auto
from .. import graph, solver
from ..util import monitor

logger = logging.getLogger(__name__)


class ApplicationMixin(object):
    """
    General interface of graph applications.

    Parameters:
        dim (int): dimension of embeddings
        gpus (list of int, optional): GPU ids, default is all GPUs
        cpu_per_gpu (int, optional): number of CPU threads per GPU, default is all CPUs
        float_type (dtype, optional): type of parameters
        index_type (dtype, optional): type of graph indexes
    """
    def __init__(self, dim, gpus=[], cpu_per_gpu=auto, float_type=dtype.float32, index_type=dtype.uint32):
        self.dim = dim
        self.gpus = gpus
        self.cpu_per_gpu = cpu_per_gpu
        self.float_type = float_type
        self.index_type = index_type

    def get_graph(self, **kwargs):
        raise NotImplementedError

    def get_solver(self, **kwargs):
        raise NotImplementedError

    @monitor.time
    def load(self, **kwargs):
        """load(**kwargs)
        Load a graph from file or Python object.
        Arguments depend on the underlying graph type.
        """
        self.graph = self.get_graph(**kwargs)
        self.graph.load(**kwargs)

    @monitor.time
    def build(self, **kwargs):
        """build(**kwargs)
        Build the solver from the graph.
        Arguments depend on the underlying solver type.
        """
        self.solver = self.get_solver(**kwargs)
        self.solver.build(self.graph, **kwargs)

    @monitor.time
    def train(self, **kwargs):
        """train(**kwargs)
        Train embeddings with the solver.
        Arguments depend on the underlying solver type.
        """
        self.solver.train(**kwargs)

    @monitor.time
    def evaluate(self, task, **kwargs):
        """evaluate(task, **kwargs)
        Evaluate the learned embeddings on a downstream task.
        Arguments depend on the underlying graph type and the task.

        Parameters:
            task (str): name of task
        """
        self.solver.clear()

        func_name = task.replace(" ", "_")
        if not hasattr(self, func_name):
            raise ValueError("Unknown task `%s`" % task)
        logger.info("evaluate on %s" % task)
        result = getattr(self, func_name)(**kwargs)
        for metric, value in sorted(result.items()):
            logger.warning("%s: %g" % (metric, value))

    @monitor.time
    def save(self, file_name):
        """
        Save embeddings and name mappings in numpy format.

        Parameters:
            file_name (str): file name
        """
        logger.warning("save embeddings and name mappings to `%s`" % file_name)

        objects = EasyDict()
        for name in dir(self.graph):
            object = getattr(self.graph, name)
            if isinstance(object, list) and len(object) > 0 and isinstance(object[0], str): # id2name
                objects[name] = object
        for name in dir(self.solver):
            object = getattr(self.solver, name)
            if isinstance(object, np.ndarray): # embedding
                objects[name] = object

        with open(file_name, "wb") as fout:
            pickle.dump(objects, fout, protocol=pickle.HIGHEST_PROTOCOL)

    def name_map(self, dicts, names):
        assert len(dicts) == len(names), "The number of dictionaries and names must be equal"

        indexes = [[] for _ in range(len(names))]
        num_param = len(names)
        num_sample = len(names[0])
        for i in range(num_sample):
            valid = True
            for j in range(num_param):
                if names[j][i] not in dicts[j]:
                    valid = False
                    break
            if valid:
                for j in range(num_param):
                    indexes[j].append(dicts[j][names[j][i]])
        return indexes

    def gpu_map(self, func, settings):
        import torch

        gpus = self.gpus if self.gpus else range(torch.cuda.device_count())
        new_settings = []
        for i, setting in enumerate(settings):
            new_settings.append(setting + (gpus[i % len(gpus)],))
        settings = new_settings

        try:
            start_method = multiprocessing.get_start_method()
            # if there are other running processes, this could cause leakage of semaphores
            multiprocessing.set_start_method("spawn", force=True)
            pool = multiprocessing.Pool(len(gpus))
            results = pool.map(func, settings, chunksize=1)
            multiprocessing.set_start_method(start_method, force=True)
        except AttributeError:
            logger.info("Spawn mode is not supported by multiprocessing. Switch to serial execution.")
            results = list(map(func, settings))

        return results


[docs]class GraphApplication(ApplicationMixin): """ Node embedding application. Given a graph, it embeds each node into a continuous vector representation. The learned embeddings can be used for many downstream tasks. e.g. **node classification**, **link prediction**, **node analogy**. The similarity between node embeddings can be measured by cosine distance. Supported Models: - DeepWalk (`DeepWalk: Online Learning of Social Representations`_) - LINE (`LINE: Large-scale Information Network Embedding`_) - node2vec (`node2vec: Scalable Feature Learning for Networks`_) .. _DeepWalk\: Online Learning of Social Representations: https://arxiv.org/pdf/1403.6652.pdf .. _LINE\: Large-scale Information Network Embedding: https://arxiv.org/pdf/1503.03578.pdf .. _node2vec\: Scalable Feature Learning for Networks: https://www.kdd.org/kdd2016/papers/files/rfp0218-groverA.pdf Parameters: dim (int): dimension of embeddings gpus (list of int, optional): GPU ids, default is all GPUs cpu_per_gpu (int, optional): number of CPU threads per GPU, default is all CPUs float_type (dtype, optional): type of parameters index_type (dtype, optional): type of graph indexes See also: :class:`Graph <graphvite.graph.Graph>`, :class:`GraphSolver <graphvite.solver.GraphSolver>` """ def get_graph(self, **kwargs): return graph.Graph(self.index_type) def get_solver(self, **kwargs): if self.cpu_per_gpu == auto: num_sampler_per_worker = auto else: num_sampler_per_worker = self.cpu_per_gpu - 1 return solver.GraphSolver(self.dim, self.float_type, self.index_type, self.gpus, num_sampler_per_worker)
[docs] def node_classification(self, X=None, Y=None, file_name=None, portions=(0.02,), normalization=False, times=1, patience=100): """ Evaluate node embeddings on node classification task. Parameters: X (list of str, optional): names of nodes Y (list, optional): labels of nodes file_name (str, optional): file of nodes & labels portions (tuple of float, optional): how much data for training normalization (bool, optional): normalize the embeddings or not times (int, optional): number of trials patience (int, optional): patience on loss convergence Returns: dict: macro-F1 & micro-F1 averaged over all trials """ import scipy.sparse as sp import torch if file_name: if not (X is None and Y is None): raise ValueError("Evaluation data and file should not be provided at the same time") X = [] Y = [] with open(file_name, "r") as fin: for line in fin: x, y = line.split() X.append(x) Y.append(y) if X is None or Y is None: raise ValueError("Either evaluataion data (X, Y) or a file name should be provided") name2id = self.graph.name2id class2id = {c:i for i, c in enumerate(np.unique(Y))} new_X, new_Y = self.name_map((name2id, class2id), (X, Y)) logger.info("effective labels: %d / %d" % (len(new_X), len(X))) X = np.asarray(new_X) Y = np.asarray(new_Y) labels = sp.coo_matrix((np.ones_like(X), (X, Y)), dtype=np.int).todense() indexes, _ = np.where(np.sum(labels, axis=1) > 0) # discard non-labeled nodes labels = labels[indexes] vertex_embeddings = self.solver.vertex_embeddings[indexes] settings = [] for portion in portions: settings.append((vertex_embeddings, labels, portion, normalization, times, patience)) results = self.gpu_map(linear_classification, settings) metrics = {} for result in results: metrics.update(result) return metrics
def linear_classification(args): import torch from torch import optim from torch.nn import functional as F from .network import NodeClassifier def generate_one_vs_rest(indexes, labels): new_indexes = [] new_labels = [] num_class = labels.shape[1] for index, sample_labels in zip(indexes, labels): for cls in np.where(sample_labels)[0]: new_indexes.append(index) new_label = np.zeros(num_class, dtype=np.int) new_label[cls] = 1 new_labels.append(new_label) return torch.as_tensor(new_indexes), torch.as_tensor(new_labels) embeddings, labels, portion, normalization, times, patience, gpu = args num_sample, num_class = labels.shape num_train = int(num_sample * portion) macro_f1s = [] micro_f1s = [] for t in range(times): samples = np.random.permutation(num_sample) train_samples = samples[:num_train] train_labels = np.asarray(labels[train_samples]) train_samples, train_labels = generate_one_vs_rest(train_samples, train_labels) test_samples = torch.as_tensor(samples[num_train:]) test_labels = torch.as_tensor(labels[test_samples]) model = NodeClassifier(embeddings, num_class, normalization=normalization) train_samples = train_samples.cuda(gpu) train_labels = train_labels.cuda(gpu) test_samples = test_samples.cuda(gpu) test_labels = test_labels.cuda(gpu) model = model.cuda(gpu) # train optimizer = optim.SGD(model.parameters(), lr=1, weight_decay=2e-5, momentum=0.9) best_loss = float("inf") best_epoch = -1 for epoch in range(100000): optimizer.zero_grad() logits = model(train_samples) loss = F.binary_cross_entropy_with_logits(logits, train_labels.float()) loss.backward() optimizer.step() loss = loss.item() if loss < best_loss: best_epoch = epoch best_loss = loss if epoch == best_epoch + patience: break # test logits = model(test_samples) num_labels = test_labels.sum(dim=1, keepdim=True) sorted, _ = logits.sort(dim=1, descending=True) thresholds = sorted.gather(dim=1, index=num_labels-1) predictions = (logits >= thresholds).long() # compute metric num_TP_per_class = (predictions & test_labels).sum(dim=0).float() num_T_per_class = test_labels.sum(dim=0).float() num_P_per_class = predictions.sum(dim=0).float() macro_f1s.append((2 * num_TP_per_class / (num_T_per_class + num_P_per_class)).mean().item()) num_TP = (predictions & test_labels).sum().float() num_T = test_labels.sum().float() num_P = predictions.sum().float() micro_f1s.append((2 * num_TP / (num_T + num_P)).item()) return { "macro-F1@%g%%" % (portion * 100): np.mean(macro_f1s), "micro-F1@%g%%" % (portion * 100): np.mean(micro_f1s) }
[docs]class WordGraphApplication(ApplicationMixin): """ Word node embedding application. Given a corpus, it embeds each word into a continuous vector representation. The learned embeddings can be used for natural language processing tasks. This can be viewed as a variant of the word2vec algorithm, with random walk augmentation support. The similarity between node embeddings can be measured by cosine distance. Supported Models: - LINE (`LINE: Large-scale Information Network Embedding`_) Parameters: dim (int): dimension of embeddings gpus (list of int, optional): GPU ids, default is all GPUs cpu_per_gpu (int, optional): number of CPU threads per GPU, default is all CPUs float_type (dtype, optional): type of parameters index_type (dtype, optional): type of graph indexes See also: :class:`WordGraph <graphvite.graph.WordGraph>`, :class:`GraphSolver <graphvite.solver.GraphSolver>` """ def get_graph(self, **kwargs): return graph.WordGraph(self.index_type) def get_solver(self, **kwargs): if self.cpu_per_gpu == auto: num_sampler_per_worker = auto else: num_sampler_per_worker = self.cpu_per_gpu - 1 return solver.GraphSolver(self.dim, self.float_type, self.index_type, self.gpus, num_sampler_per_worker)
[docs]class KnowledgeGraphApplication(ApplicationMixin): """ Knowledge graph embedding application. Given a knowledge graph, it embeds each entity and relation into a continuous vector representation respectively. The learned embeddings can be used for analysis of knowledge graphs. e.g. **entity clustering**, **link prediction**. The likelihood of edges can be inferred by the score function over embeddings of triplets. Supported Models: - TransE (`Translating Embeddings for Modeling Multi-relational Data`_) - DistMult (`Embedding Entities and Relations for Learnig and Inference in Knowledge Bases`_) - ComplEx (`Complex Embeddings for Simple Link Prediction`_) - SimplE (`SimplE Embedding for Link Prediction in Knowledge Graphs`_) - RotatE (`RotatE: Knowledge Graph Embedding by Relational Rotation in Complex Space`_) .. _Translating Embeddings for Modeling Multi-relational Data: http://papers.nips.cc/paper/5071-translating-embeddings-for-modeling-multi-relational-data.pdf .. _Embedding Entities and Relations for Learnig and Inference in Knowledge Bases: https://arxiv.org/pdf/1412.6575.pdf .. _Complex Embeddings for Simple Link Prediction: http://proceedings.mlr.press/v48/trouillon16.pdf .. _SimplE Embedding for Link Prediction in Knowledge Graphs: https://papers.nips.cc/paper/7682-simple-embedding-for-link-prediction-in-knowledge-graphs.pdf .. _RotatE\: Knowledge Graph Embedding by Relational Rotation in Complex Space: https://arxiv.org/pdf/1902.10197.pdf Parameters: dim (int): dimension of embeddings gpus (list of int, optional): GPU ids, default is all GPUs cpu_per_gpu (int, optional): number of CPU threads per GPU, default is all CPUs float_type (dtype, optional): type of parameters index_type (dtype, optional): type of graph indexes Note: The implementation of TransE, DistMult and ComplEx, SimplE are slightly different from their original papers. The loss function and the regularization term generally follow `this repo`_. Adversarial negative sampling is also adopted in these models like RotatE. .. _this repo: https://github.com/DeepGraphLearning/KnowledgeGraphEmbedding See also: :class:`KnowledgeGraph <graphvite.graph.KnowledgeGraph>`, :class:`KnowledgeGraphSolver <graphvite.solver.KnowledgeGraphSolver>` """ def get_graph(self, **kwargs): return graph.KnowledgeGraph(self.index_type) def get_solver(self, **kwargs): if self.cpu_per_gpu == auto: num_sampler_per_worker = auto else: num_sampler_per_worker = self.cpu_per_gpu - 1 return solver.KnowledgeGraphSolver(self.dim, self.float_type, self.index_type, self.gpus, num_sampler_per_worker)
def link_prediction(args): import torch from .network import LinkPredictor entity_embeddings, relation_embeddings, H, R, T, exclude_H, exclude_T, score_function, gpu = args num_entity = len(entity_embeddings) model = LinkPredictor(score_function, entity_embeddings, relation_embeddings, entity_embeddings) model = model.cuda(gpu) rankings = [] for h, r, t in zip(H, R, T): negatives = list(set(range(num_entity)) - exclude_T[(h, r)]) batch_size = len(negatives) + 1 batch_h = h * torch.ones(batch_size, dtype=torch.long) batch_r = r * torch.ones(batch_size, dtype=torch.long) batch_t = torch.as_tensor([t] + negatives) batch_h = batch_h.cuda(gpu) batch_r = batch_r.cuda(gpu) batch_t = batch_t.cuda(gpu) score = model(batch_h, batch_r, batch_t) rankings.append((score >= score[0]).sum().item()) negatives = list(set(range(num_entity)) - exclude_H[(t, r)]) batch_size = len(negatives) + 1 batch_h = torch.as_tensor([h] + negatives) batch_r = r * torch.ones(batch_size, dtype=torch.long) batch_t = t * torch.ones(batch_size, dtype=torch.long) batch_h = batch_h.cuda(gpu) batch_r = batch_r.cuda(gpu) batch_t = batch_t.cuda(gpu) score = model(batch_h, batch_r, batch_t) rankings.append((score >= score[0]).sum().item()) rankings = np.asarray(rankings) return rankings
[docs]class VisualizationApplication(ApplicationMixin): """ Graph & high-dimensional data visualization. Given a graph or high-dimensional vectors, it maps each node to 2D or 3D coordinates to faciliate visualization. The learned coordinates preserve most local similarity information of the original input, and may shed some light on the structure of the graph or the high-dimensional space. Supported Models: - LargeVis (`Visualizing Large-scale and High-dimensional Data`_) .. _Visualizing Large-scale and High-dimensional Data: https://arxiv.org/pdf/1602.00370.pdf Parameters: dim (int): dimension of embeddings gpus (list of int, optional): GPU ids, default is all GPUs cpu_per_gpu (int, optional): number of CPU threads per GPU, default is all CPUs float_type (dtype, optional): type of parameters index_type (dtype, optional): type of graph indexes See also: :class:`Graph <graphvite.graph.Graph>`, :class:`KNNGraph <graphvite.graph.KNNGraph>`, :class:`VisualizationSolver <graphvite.solver.VisualizationSolver>` """ OUTLIER_THRESHOLD = 5 def get_graph(self, **kwargs): if "file_name" in kwargs or "edge_list" in kwargs: return graph.Graph(self.index_type) else: return graph.KNNGraph(self.index_type, self.gpus, self.cpu_per_gpu) def get_solver(self, **kwargs): if self.cpu_per_gpu == auto: num_sampler_per_worker = auto else: num_sampler_per_worker = self.cpu_per_gpu - 1 return solver.VisualizationSolver(self.dim, self.float_type, self.index_type, self.gpus, num_sampler_per_worker)
[docs] def visualization(self, Y=None, file_name=None, save_file=None, figure_size=10, scale=2): """ Visualize learned 2D or 3D coordinates. Parameters: Y (list of str, optional): labels of vectors file_name (str, optional): file of labels save_file (str, optional): ``png`` or ``pdf`` file to save visualization, if not provided, show the figure in window figure_size (int, optional): size of figure scale (int, optional): size of points """ from matplotlib import pyplot as plt plt.switch_backend("agg") # for compatibility coordinates = self.solver.coordinates dim = coordinates.shape[1] if not (dim == 2 or dim == 3): raise ValueError("Can't visualize %dD data" % dim) if file_name: if not (Y is None): raise ValueError("Evaluation data and file should not be provided at the same time") with open(file_name, "r") as fin: Y = [line.strip() for line in fin] elif Y is None: Y = ["unknown"] * self.graph.num_vertex Y = np.asarray(Y) mean = np.mean(coordinates, axis=0) std = np.std(coordinates, axis=0) inside = np.abs(coordinates - mean) < self.OUTLIER_THRESHOLD * std indexes, = np.where(np.all(inside, axis=1)) # discard outliers coordinates = coordinates[indexes] Y = Y[indexes] classes = sorted(np.unique(Y)) fig = plt.figure(figsize=(figure_size, figure_size)) if dim == 2: ax = fig.gca() elif dim == 3: from mpl_toolkits.mplot3d import Axes3D ax = fig.gca(projection="3d") for cls in classes: indexes, = np.where(Y == cls) ax.scatter(*coordinates[indexes].T, s=scale) ax.set_xticks([]) ax.set_yticks([]) if dim == 3: ax.set_zticks([]) if len(classes) > 1: ax.legend(classes, markerscale=6, loc="upper right") if save_file: logger.warning("save visualization to `%s`" % save_file) plt.savefig(save_file) else: plt.show() return {}
[docs] def hierarchy(self, HY=None, file_name=None, target=None, save_file=None, figure_size=10, scale=2, duration=3): """ Visualize learned 2D coordinates with hierarchical labels. Parameters: HY (list of list of str, optional): hierarchical labels of vectors file_name (str, optional): file of hierarchical labels target (str): target class save_file (str): ``gif`` file to save visualization figure_size (int, optional): size of figure scale (int, optional): size of points duration (float, optional): duration of each frame in seconds """ import imageio from matplotlib import pyplot as plt plt.switch_backend("agg") # for compatibility coordinates = self.solver.coordinates dim = coordinates.shape[1] if dim != 2: raise ValuerError("Can't visualize the hierarchy of %dD data" % dim) if file_name: if not (HY is None): raise ValueError("Evaluation data and file should not be provided at the same time") with open(file_name, "r") as fin: HY = [line.split() for line in fin] elif HY is None: raise ValueError("No label is provided for hierarchy") HY = np.asarray(HY) min_type = "S%d" % len("else") if HY.dtype < min_type: HY = HY.astype(min_type) mean = np.mean(coordinates, axis=0) std = np.std(coordinates, axis=0) inside = np.abs(coordinates - mean) < self.OUTLIER_THRESHOLD * std indexes, = np.where(np.all(inside, axis=1)) # discard outliers coordinates = coordinates[indexes] HY = HY[indexes].T if target is None: raise ValueError("Target class is not provided") for depth, Y in enumerate(HY): indexes, = np.where(Y == target) if len(indexes) > 0: sample = indexes[0] break else: raise ValueError("Can't find target `%s` in the hierarchy" % target) settings = [(coordinates, None, HY[0], sample, figure_size, scale, 0)] for i in range(depth): settings.append((coordinates, HY[i], HY[i + 1], sample, figure_size, scale, i+1)) pool = multiprocessing.Pool(self.solver.num_worker + self.solver.num_sampler) frames = pool.map(render_hierarchy, settings) logger.warning("save hierarchy to `%s`" % save_file) imageio.mimsave(save_file, frames, fps=1 / duration, subrectangles=True) return {}
[docs] def animation(self, Y=None, file_name=None, save_file=None, figure_size=5, scale=1, elevation=30, num_frame=700): """ Rotate learn 3D coordinates as an animation. Parameters: Y (list of str, optional): labels of vectors file_name (str, optional): file of labels save_file (str): ``gif`` file to save visualization figure_size (int, optional): size of figure scale (int, optional): size of points elevation (float, optional): elevation angle num_frame (int, optional): number of frames """ import imageio from matplotlib import pyplot as plt, animation from mpl_toolkits.mplot3d import Axes3D plt.switch_backend("agg") # for compatibility coordinates = self.solver.coordinates dim = coordinates.shape[1] if dim != 3: raise ValueError("Can't animate %dD data" % dim) if file_name: if not (Y is None): raise ValueError("Evaluation data and file should not be provided at the same time") with open(file_name, "r") as fin: Y = [line.strip() for line in fin] elif Y is None: Y = ["unknown"] * self.graph.num_vertex Y = np.asarray(Y) mean = np.mean(coordinates, axis=0) std = np.std(coordinates, axis=0) inside = np.abs(coordinates - mean) < self.OUTLIER_THRESHOLD * std indexes, = np.where(np.all(inside, axis=1)) # discard outliers coordinates = coordinates[indexes] Y = Y[indexes] settings = [] degrees = np.linspace(0, 360, num_frame, endpoint=False) for degree in degrees: settings.append((coordinates, Y, degree, figure_size, scale, elevation)) pool = multiprocessing.Pool(self.solver.num_worker + self.solver.num_sampler) frames = pool.map(render_animation, settings) logger.warning("save animation to `%s`" % save_file) imageio.mimsave(save_file, frames, fps=num_frame / 70, subrectangles=True) # 70 seconds return {}
def render_hierarchy(args): from matplotlib import pyplot as plt plt.switch_backend("agg") coordinates, H, Y, sample, figure_size, scale, depth = args fig = plt.figure(figsize=(figure_size, figure_size)) ax = fig.gca() if H is not None: for i in range(len(Y)): if H[i] != H[sample]: Y[i] = "else" classes = set(Y) classes.discard(Y[sample]) classes.discard("else") classes = [Y[sample]] + sorted(classes) + ["else"] for i, cls in enumerate(classes): indexes, = np.where(Y == cls) color = "lightgrey" if cls == "else" else None ax.scatter(*coordinates[indexes].T, s=2, c=color, zorder=-i) ax.set_xticks([]) ax.set_yticks([]) ax.legend(classes, markerscale=6, loc="upper right") fig.canvas.draw() frame = np.asarray(fig.canvas.renderer._renderer) return frame def render_animation(args): from matplotlib import pyplot as plt from mpl_toolkits.mplot3d import Axes3D plt.switch_backend("agg") coordinates, Y, degree, figure_size, scale, elevation = args classes = sorted(np.unique(Y)) fig = plt.figure(figsize=(figure_size, figure_size)) ax = fig.gca(projection="3d") for cls in classes: indexes, = np.where(Y == cls) ax.scatter(*coordinates[indexes].T, s=scale) ax.view_init(elev=elevation, azim=degree) ax.set_xticks([]) ax.set_yticks([]) ax.set_zticks([]) if len(classes) > 1: ax.legend(classes, markerscale=6) fig.canvas.draw() frame = np.asarray(fig.canvas.renderer._renderer) return frame
[docs]class Application(object): """ Application(type, *args, **kwargs) Create an application instance of any type. Parameters: type (str): application type, can be 'graph', 'word_graph', 'knowledge_graph' or 'visualization' """ application = { "graph": GraphApplication, "word_graph": WordGraphApplication, "knowledge_graph": KnowledgeGraphApplication, "visualization": VisualizationApplication } def __new__(cls, type, *args, **kwargs): if type in cls.application: return cls.application[type](*args, **kwargs) else: raise ValueError("Unknown application `%s`" % type)
__all__ = [ "Application", "GraphApplication", "WordGraphApplication", "KnowledgeGraphApplication", "VisualizationApplication" ]