1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
//! Definance Virtual Machine
//! server implementation on tonic & tokio.
//! Run with `cargo run --bin dvm "http://[::1]:50051" "http://[::1]:50052"`

#[macro_use]
extern crate log;

use http::Uri;
use clap::Clap;

use tonic::transport::Server;
use futures::future::FutureExt;

use compiler::Compiler;
use services::compiler::CompilerService;
use services::metadata::MetadataService;

use dvm_net::{prelude::*, api, tonic};
use api::grpc::dvm_compiler_server::DvmCompilerServer;
use api::grpc::dvm_bytecode_metadata_server::DvmBytecodeMetadataServer;
use dvm_net::api::grpc::{
    vm_script_executor_server::VmScriptExecutorServer,
    vm_module_publisher_server::VmModulePublisherServer,
};
use data_source::{GrpcDataSource, ModuleCache, DsMeter};
use anyhow::Result;
use services::vm::VmService;
use dvm_cli::config::*;
use dvm_cli::{init, version};
use futures::join;
use dvm_info::config::{InfoServiceConfig, MemoryOptions};
use dvm_cli::info_service::create_info_service;
use dvm_info::memory_check::MemoryChecker;
use runtime::vm::dvm::Dvm;

/// Definance Virtual Machine
///  combined with Move compilation server
///  powered by gRPC interface on top of TCP/IPC.
/// API described in protobuf schemas: https://github.com/dfinance/dvm-proto
#[derive(Debug, Clone, Clap)]
#[clap(name = "dvm", version = version!())]
#[clap(verbatim_doc_comment)]
struct Options {
    /// Address in the form of HOST_ADDRESS:PORT.
    /// The address will be listen to by DVM and compilation server.
    /// Listening localhost by default.
    /// Supports schemes: http, ipc.
    #[clap(
        name = "listen address",
        default_value = "http://[::1]:50051",
        verbatim_doc_comment
    )]
    address: Endpoint,

    #[clap(flatten)]
    info_service: InfoServiceConfig,

    #[clap(flatten)]
    memory_config: MemoryOptions,

    /// DataSource Server internet address.
    #[clap(
    name = "Data-Source URI",
    env = DVM_DATA_SOURCE,
    default_value = "http://[::1]:50052"
    )]
    ds: Uri,

    #[clap(flatten)]
    logging: LoggingOptions,

    #[clap(flatten)]
    integrations: IntegrationsOptions,
}

fn main() -> Result<()> {
    remove_empty_env_vars();
    let options = Options::parse();
    let _guard = init(&options.logging, &options.integrations);
    main_internal(options)
}

#[tokio::main]
async fn main_internal(options: Options) -> Result<()> {
    let (serv_term_tx, serv_term_rx) = futures::channel::oneshot::channel();
    let (ds_term_tx, ds_term_rx) = tokio::sync::oneshot::channel();
    let sigterm = dvm_cli::init_sigterm_handler_fut(move || {
        // shutdown DS
        match ds_term_tx.send(()) {
            Ok(_) => info!("shutting down DS client"),
            Err(err) => error!("unable to send sig into the DS client: {:?}", err),
        }

        // shutdown server
        match serv_term_tx.send(()) {
            Ok(_) => info!("shutting down VM server"),
            Err(err) => error!("unable to send sig into the server: {:?}", err),
        }
    });

    let (info_service, hrm) = create_info_service(options.info_service);

    // data-source client
    let ds = GrpcDataSource::new(options.ds, Some(ds_term_rx))
        .expect("Unable to instantiate GrpcDataSource.");
    let ds = ModuleCache::new(DsMeter::new(ds), options.memory_config.module_cache());
    let mem_checker = MemoryChecker::new(options.memory_config);
    // vm services
    let vm_service = VmService::new(Dvm::new(ds.clone(), Some(mem_checker)), hrm);
    // comp services
    let compiler_service = CompilerService::new(Compiler::new(ds));
    let metadata_service = MetadataService::default();

    // spawn the signal-router:
    tokio::spawn(sigterm);
    // block-on the server:
    let dvm = Server::builder()
        // vm service
        .add_service(VmScriptExecutorServer::new(vm_service.clone()))
        .add_service(VmModulePublisherServer::new(vm_service.clone()))
        // comp services
        .add_service(DvmCompilerServer::new(compiler_service.clone()))
        .add_service(DvmBytecodeMetadataServer::new(metadata_service))
        // serve
        .serve_ext_with_shutdown(options.address, serv_term_rx.map(|_| ()))
        .map(|res| {
            info!("VM server is shutted down");
            res
        });

    if let Some(info_service) = info_service {
        let (_info_service, dvm) = join!(info_service, dvm);
        dvm.expect("Dvm internal error");
    } else {
        dvm.await.expect("Dvm internal error");
    }

    Ok(())
}