Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

401

402

403

404

405

406

407

408

409

410

411

412

413

414

415

416

417

418

419

420

421

422

423

424

425

426

427

428

429

430

431

432

433

434

435

436

437

438

439

440

441

442

443

444

445

446

447

448

449

450

451

452

453

454

455

456

457

458

459

460

461

462

463

464

465

466

467

468

469

470

471

472

473

474

475

476

477

478

479

480

481

482

483

484

485

486

487

488

489

490

491

492

493

494

495

496

497

498

499

500

501

502

503

504

505

506

507

508

509

510

511

512

513

514

515

516

517

518

519

520

521

522

523

524

525

526

527

528

529

530

531

532

533

534

535

536

537

538

539

540

541

542

543

544

545

546

547

548

549

550

551

552

553

554

555

556

557

558

559

560

561

562

563

564

# -*- coding: utf-8 -*- 

# 

# Copyright (C) 2008,2009 Andrew Resch <andrewresch@gmail.com> 

# 

# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with 

# the additional special exception to link portions of this program with the OpenSSL library. 

# See LICENSE for more details. 

# 

 

"""RPCServer Module""" 

 

import logging 

import os 

import stat 

import sys 

import traceback 

from types import FunctionType 

 

from OpenSSL import SSL, crypto 

from twisted.internet import defer, reactor 

from twisted.internet.protocol import Factory 

 

import deluge.component as component 

import deluge.configmanager 

from deluge.core.authmanager import AUTH_LEVEL_ADMIN, AUTH_LEVEL_DEFAULT, AUTH_LEVEL_NONE 

from deluge.error import DelugeError, IncompatibleClient, NotAuthorizedError, WrappedException, _ClientSideRecreateError 

from deluge.transfer import DelugeTransferProtocol 

 

RPC_RESPONSE = 1 

RPC_ERROR = 2 

RPC_EVENT = 3 

 

log = logging.getLogger(__name__) 

 

 

def export(auth_level=AUTH_LEVEL_DEFAULT): 

    """ 

    Decorator function to register an object's method as an RPC.  The object 

    will need to be registered with an :class:`RPCServer` to be effective. 

 

    :param func: the function to export 

    :type func: function 

    :param auth_level: the auth level required to call this method 

    :type auth_level: int 

 

    """ 

    def wrap(func, *args, **kwargs): 

        func._rpcserver_export = True 

        func._rpcserver_auth_level = auth_level 

        doc = func.__doc__ 

        func.__doc__ = "**RPC Exported Function** (*Auth Level: %s*)\n\n" % auth_level 

        if doc: 

            func.__doc__ += doc 

 

        return func 

 

    if type(auth_level) is FunctionType: 

        func = auth_level 

        auth_level = AUTH_LEVEL_DEFAULT 

        return wrap(func) 

    else: 

        return wrap 

 

 

def format_request(call): 

    """ 

    Format the RPCRequest message for debug printing 

 

    :param call: the request 

    :type call: a RPCRequest 

 

    :returns: a formatted string for printing 

    :rtype: str 

 

    """ 

    try: 

        s = call[1] + "(" 

        if call[2]: 

            s += ", ".join([str(x) for x in call[2]]) 

        if call[3]: 

            if call[2]: 

                s += ", " 

            s += ", ".join([key + "=" + str(value) for key, value in call[3].items()]) 

        s += ")" 

    except UnicodeEncodeError: 

        return "UnicodeEncodeError, call: %s" % call 

    else: 

        return s 

 

 

class ServerContextFactory(object): 

    def getContext(self):  # NOQA 

        """ 

        Create an SSL context. 

 

        This loads the servers cert/private key SSL files for use with the 

        SSL transport. 

        """ 

        ssl_dir = deluge.configmanager.get_config_dir("ssl") 

        ctx = SSL.Context(SSL.SSLv3_METHOD) 

        ctx.use_certificate_file(os.path.join(ssl_dir, "daemon.cert")) 

        ctx.use_privatekey_file(os.path.join(ssl_dir, "daemon.pkey")) 

        return ctx 

 

 

class DelugeRPCProtocol(DelugeTransferProtocol): 

    def message_received(self, request): 

        """ 

        This method is called whenever a message is received from a client.  The 

        only message that a client sends to the server is a RPC Request message. 

        If the RPC Request message is valid, then the method is called in 

        :meth:`dispatch`. 

 

        :param request: the request from the client. 

        :type data: tuple 

 

        """ 

        if type(request) is not tuple: 

            log.debug("Received invalid message: type is not tuple") 

            return 

 

        if len(request) < 1: 

            log.debug("Received invalid message: there are no items") 

            return 

 

        for call in request: 

            if len(call) != 4: 

                log.debug("Received invalid rpc request: number of items " 

                          "in request is %s", len(call)) 

                continue 

            # log.debug("RPCRequest: %s", format_request(call)) 

            reactor.callLater(0, self.dispatch, *call) 

 

    def sendData(self, data):  # NOQA 

        """ 

        Sends the data to the client. 

 

        :param data: the object that is to be sent to the client.  This should 

            be one of the RPC message types. 

        :type data: object 

 

        """ 

        self.transfer_message(data) 

 

    def connectionMade(self):  # NOQA 

        """ 

        This method is called when a new client connects. 

        """ 

        peer = self.transport.getPeer() 

        log.info("Deluge Client connection made from: %s:%s", 

                 peer.host, peer.port) 

        # Set the initial auth level of this session to AUTH_LEVEL_NONE 

        self.factory.authorized_sessions[self.transport.sessionno] = AUTH_LEVEL_NONE 

 

    def connectionLost(self, reason):  # NOQA 

        """ 

        This method is called when the client is disconnected. 

 

        :param reason: the reason the client disconnected. 

        :type reason: str 

 

        """ 

 

        # We need to remove this session from various dicts 

        del self.factory.authorized_sessions[self.transport.sessionno] 

        if self.transport.sessionno in self.factory.session_protocols: 

            del self.factory.session_protocols[self.transport.sessionno] 

        if self.transport.sessionno in self.factory.interested_events: 

            del self.factory.interested_events[self.transport.sessionno] 

 

        log.info("Deluge client disconnected: %s", reason.value) 

 

    def valid_session(self): 

        return self.transport.sessionno in self.factory.authorized_sessions 

 

    def dispatch(self, request_id, method, args, kwargs): 

        """ 

        This method is run when a RPC Request is made.  It will run the local method 

        and will send either a RPC Response or RPC Error back to the client. 

 

        :param request_id: the request_id from the client (sent in the RPC Request) 

        :type request_id: int 

        :param method: the local method to call. It must be registered with 

            the :class:`RPCServer`. 

        :type method: str 

        :param args: the arguments to pass to `method` 

        :type args: list 

        :param kwargs: the keyword-arguments to pass to `method` 

        :type kwargs: dict 

 

        """ 

        def send_error(): 

            """ 

            Sends an error response with the contents of the exception that was raised. 

            """ 

            exceptionType, exceptionValue, exceptionTraceback = sys.exc_info() 

            formated_tb = traceback.format_exc() 

            try: 

                self.sendData(( 

                    RPC_ERROR, 

                    request_id, 

                    exceptionType.__name__, 

                    exceptionValue._args, 

                    exceptionValue._kwargs, 

                    formated_tb 

                )) 

            except AttributeError as err: 

                # This is not a deluge exception (object has no attribute '_args), let's wrap it 

                log.error("An exception occurred while sending RPC_ERROR to " 

                          "client. Wrapping it and resending. Error to " 

                          "send(causing exception goes next):\n%s", formated_tb) 

                try: 

                    raise WrappedException(str(exceptionValue), exceptionType.__name__, formated_tb) 

                except: 

                    send_error() 

            except Exception as err: 

                log.error("An exception occurred while sending RPC_ERROR to client: %s", err) 

 

        if method == "daemon.info": 

            # This is a special case and used in the initial connection process 

            self.sendData((RPC_RESPONSE, request_id, deluge.common.get_version())) 

            return 

245        elif method == "daemon.login": 

            # This is a special case and used in the initial connection process 

            # We need to authenticate the user here 

            log.debug("RPC dispatch daemon.login") 

            try: 

                client_version = kwargs.pop('client_version', None) 

                if client_version is None: 

                    raise IncompatibleClient(deluge.common.get_version()) 

                ret = component.get("AuthManager").authorize(*args, **kwargs) 

240                if ret: 

                    self.factory.authorized_sessions[self.transport.sessionno] = (ret, args[0]) 

                    self.factory.session_protocols[self.transport.sessionno] = self 

            except Exception as ex: 

                send_error() 

                if not isinstance(ex, _ClientSideRecreateError): 

                    log.exception(ex) 

            else: 

                self.sendData((RPC_RESPONSE, request_id, (ret))) 

242                if not ret: 

                    self.transport.loseConnection() 

            finally: 

                return 

        elif method == "daemon.set_event_interest" and self.valid_session(): 

            log.debug("RPC dispatch daemon.set_event_interest") 

            # This special case is to allow clients to set which events they are 

            # interested in receiving. 

            # We are expecting a sequence from the client. 

            try: 

                if self.transport.sessionno not in self.factory.interested_events: 

                    self.factory.interested_events[self.transport.sessionno] = [] 

                self.factory.interested_events[self.transport.sessionno].extend(args[0]) 

            except Exception: 

                send_error() 

            else: 

                self.sendData((RPC_RESPONSE, request_id, (True))) 

            finally: 

                return 

 

        if method in self.factory.methods and self.valid_session(): 

            log.debug("RPC dispatch %s", method) 

            try: 

                method_auth_requirement = self.factory.methods[method]._rpcserver_auth_level 

                auth_level = self.factory.authorized_sessions[self.transport.sessionno][0] 

                if auth_level < method_auth_requirement: 

                    # This session is not allowed to call this method 

                    log.debug("Session %s is trying to call a method it is not " 

                              "authorized to call!", self.transport.sessionno) 

                    raise NotAuthorizedError(auth_level, method_auth_requirement) 

                # Set the session_id in the factory so that methods can know 

                # which session is calling it. 

                self.factory.session_id = self.transport.sessionno 

                ret = self.factory.methods[method](*args, **kwargs) 

            except Exception as ex: 

                send_error() 

                # Don't bother printing out DelugeErrors, because they are just 

                # for the client 

                if not isinstance(ex, DelugeError): 

                    log.exception("Exception calling RPC request: %s", ex) 

            else: 

                # Check if the return value is a deferred, since we'll need to 

                # wait for it to fire before sending the RPC_RESPONSE 

                if isinstance(ret, defer.Deferred): 

                    def on_success(result): 

                        self.sendData((RPC_RESPONSE, request_id, result)) 

                        return result 

 

                    def on_fail(failure): 

                        try: 

                            failure.raiseException() 

                        except Exception: 

                            send_error() 

                        return failure 

 

                    ret.addCallbacks(on_success, on_fail) 

                else: 

                    self.sendData((RPC_RESPONSE, request_id, ret)) 

 

 

class RPCServer(component.Component): 

    """ 

    This class is used to handle rpc requests from the client.  Objects are 

    registered with this class and their methods are exported using the export 

    decorator. 

 

    :param port: the port the RPCServer will listen on 

    :type port: int 

    :param interface: the interface to listen on, this may override the `allow_remote` setting 

    :type interface: str 

    :param allow_remote: set True if the server should allow remote connections 

    :type allow_remote: bool 

    :param listen: if False, will not start listening.. This is only useful in Classic Mode 

    :type listen: bool 

    """ 

 

    def __init__(self, port=58846, interface="", allow_remote=False, listen=True): 

        component.Component.__init__(self, "RPCServer") 

 

        self.factory = Factory() 

        self.factory.protocol = DelugeRPCProtocol 

        self.factory.session_id = -1 

 

        # Holds the registered methods 

        self.factory.methods = {} 

        # Holds the session_ids and auth levels 

        self.factory.authorized_sessions = {} 

        # Holds the protocol objects with the session_id as key 

        self.factory.session_protocols = {} 

        # Holds the interested event list for the sessions 

        self.factory.interested_events = {} 

 

        self.listen = listen 

337        if not listen: 

            return 

 

        if allow_remote: 

            hostname = "" 

        else: 

            hostname = "localhost" 

 

        if interface: 

            hostname = interface 

 

        log.info("Starting DelugeRPC server %s:%s", hostname, port) 

 

        # Check for SSL keys and generate some if needed 

        check_ssl_keys() 

 

        try: 

            reactor.listenSSL(port, self.factory, ServerContextFactory(), interface=hostname) 

        except Exception as ex: 

            log.info("Daemon already running or port not available..") 

            log.error(ex) 

            sys.exit(0) 

 

    def register_object(self, obj, name=None): 

        """ 

        Registers an object to export it's rpc methods.  These methods should 

        be exported with the export decorator prior to registering the object. 

 

        :param obj: the object that we want to export 

        :type obj: object 

        :param name: the name to use, if None, it will be the class name of the object 

        :type name: str 

        """ 

        if not name: 

            name = obj.__class__.__name__.lower() 

 

        for d in dir(obj): 

            if d[0] == "_": 

                continue 

            if getattr(getattr(obj, d), '_rpcserver_export', False): 

                log.debug("Registering method: %s", name + "." + d) 

                self.factory.methods[name + "." + d] = getattr(obj, d) 

 

    def deregister_object(self, obj): 

        """ 

        Deregisters an objects exported rpc methods. 

 

        :param obj: the object that was previously registered 

 

        """ 

        for key, value in self.factory.methods.items(): 

            if value.__self__ == obj: 

                del self.factory.methods[key] 

 

    def get_object_method(self, name): 

        """ 

        Returns a registered method. 

 

        :param name: the name of the method, usually in the form of 'object.method' 

        :type name: str 

 

        :returns: method 

 

        :raises KeyError: if `name` is not registered 

 

        """ 

        return self.factory.methods[name] 

 

    def get_method_list(self): 

        """ 

        Returns a list of the exported methods. 

 

        :returns: the exported methods 

        :rtype: list 

        """ 

        return self.factory.methods.keys() 

 

    def get_session_id(self): 

        """ 

        Returns the session id of the current RPC. 

 

        :returns: the session id, this will be -1 if no connections have been made 

        :rtype: int 

 

        """ 

        return self.factory.session_id 

 

    def get_session_user(self): 

        """ 

        Returns the username calling the current RPC. 

 

        :returns: the username of the user calling the current RPC 

        :rtype: string 

 

        """ 

431        if not self.listen: 

            return "localclient" 

        session_id = self.get_session_id() 

        if session_id > -1 and session_id in self.factory.authorized_sessions: 

            return self.factory.authorized_sessions[session_id][1] 

        else: 

            # No connections made yet 

            return "" 

 

    def get_session_auth_level(self): 

        """ 

        Returns the auth level of the user calling the current RPC. 

 

        :returns: the auth level 

        :rtype: int 

        """ 

447        if not self.listen or not self.is_session_valid(self.get_session_id()): 

            return AUTH_LEVEL_ADMIN 

        return self.factory.authorized_sessions[self.get_session_id()][0] 

 

    def get_rpc_auth_level(self, rpc): 

        """ 

        Returns the auth level requirement for an exported rpc. 

 

        :returns: the auth level 

        :rtype: int 

        """ 

        self.factory.methods[rpc]._rpcserver_auth_level 

 

    def is_session_valid(self, session_id): 

        """ 

        Checks if the session is still valid, eg, if the client is still connected. 

 

        :param session_id: the session id 

        :type session_id: int 

 

        :returns: True if the session is valid 

        :rtype: bool 

 

        """ 

        return session_id in self.factory.authorized_sessions 

 

    def emit_event(self, event): 

        """ 

        Emits the event to interested clients. 

 

        :param event: the event to emit 

        :type event: :class:`deluge.event.DelugeEvent` 

        """ 

        log.debug("intevents: %s", self.factory.interested_events) 

        # Find sessions interested in this event 

481        for session_id, interest in self.factory.interested_events.items(): 

            if event.name in interest: 

                log.debug("Emit Event: %s %s", event.name, event.args) 

                # This session is interested so send a RPC_EVENT 

                self.factory.session_protocols[session_id].sendData( 

                    (RPC_EVENT, event.name, event.args) 

                ) 

 

    def emit_event_for_session_id(self, session_id, event): 

        """ 

        Emits the event to specified session_id. 

 

        :param session_id: the event to emit 

        :type session_id: int 

        :param event: the event to emit 

        :type event: :class:`deluge.event.DelugeEvent` 

        """ 

498        if not self.is_session_valid(session_id): 

            log.debug("Session ID %s is not valid. Not sending event \"%s\".", session_id, event.name) 

            return 

501        if session_id not in self.factory.interested_events: 

            log.debug("Session ID %s is not interested in any events. Not sending event \"%s\".", 

                      session_id, event.name) 

            return 

505        if event.name not in self.factory.interested_events[session_id]: 

            log.debug("Session ID %s is not interested in event \"%s\". Not sending it.", session_id, event.name) 

            return 

        log.debug("Sending event \"%s\" with args \"%s\" to session id \"%s\".", 

                  event.name, event.args, session_id) 

        self.factory.session_protocols[session_id].sendData((RPC_EVENT, event.name, event.args)) 

 

 

def check_ssl_keys(): 

    """ 

    Check for SSL cert/key and create them if necessary 

    """ 

    ssl_dir = deluge.configmanager.get_config_dir("ssl") 

    if not os.path.exists(ssl_dir): 

        # The ssl folder doesn't exist so we need to create it 

        os.makedirs(ssl_dir) 

        generate_ssl_keys() 

    else: 

        for f in ("daemon.pkey", "daemon.cert"): 

            if not os.path.exists(os.path.join(ssl_dir, f)): 

                generate_ssl_keys() 

                break 

 

 

def generate_ssl_keys(): 

    """ 

    This method generates a new SSL key/cert. 

    """ 

    digest = "md5" 

    # Generate key pair 

    pkey = crypto.PKey() 

    pkey.generate_key(crypto.TYPE_RSA, 1024) 

 

    # Generate cert request 

    req = crypto.X509Req() 

    subj = req.get_subject() 

    setattr(subj, "CN", "Deluge Daemon") 

    req.set_pubkey(pkey) 

    req.sign(pkey, digest) 

 

    # Generate certificate 

    cert = crypto.X509() 

    cert.set_serial_number(0) 

    cert.gmtime_adj_notBefore(0) 

    cert.gmtime_adj_notAfter(60 * 60 * 24 * 365 * 5)  # Five Years 

    cert.set_issuer(req.get_subject()) 

    cert.set_subject(req.get_subject()) 

    cert.set_pubkey(req.get_pubkey()) 

    cert.sign(pkey, digest) 

 

    # Write out files 

    ssl_dir = deluge.configmanager.get_config_dir("ssl") 

    open(os.path.join(ssl_dir, "daemon.pkey"), "w").write( 

        crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey) 

    ) 

    open(os.path.join(ssl_dir, "daemon.cert"), "w").write( 

        crypto.dump_certificate(crypto.FILETYPE_PEM, cert) 

    ) 

    # Make the files only readable by this user 

    for f in ("daemon.pkey", "daemon.cert"): 

        os.chmod(os.path.join(ssl_dir, f), stat.S_IREAD | stat.S_IWRITE)