Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

import time 

 

from twisted.internet.defer import maybeDeferred, succeed 

from twisted.trial import unittest 

 

import deluge.component as component 

import deluge.ui.sessionproxy 

 

 

class Core(object): 

    def __init__(self): 

        self.reset() 

 

    def reset(self): 

        self.torrents = {} 

        self.torrents["a"] = {"key1": 1, "key2": 2, "key3": 3} 

        self.torrents["b"] = {"key1": 1, "key2": 2, "key3": 3} 

        self.torrents["c"] = {"key1": 1, "key2": 2, "key3": 3} 

        self.prev_status = {} 

 

    def get_session_state(self): 

        return maybeDeferred(self.torrents.keys) 

 

    def get_torrent_status(self, torrent_id, keys, diff=False): 

26        if not keys: 

            keys = self.torrents[torrent_id].keys() 

 

29        if not diff: 

            ret = {} 

            for key in keys: 

                ret[key] = self.torrents[torrent_id][key] 

 

            return succeed(ret) 

 

        else: 

            ret = {} 

42            if torrent_id in self.prev_status: 

                for key in keys: 

                    if self.prev_status[torrent_id][key] != self.torrents[torrent_id][key]: 

                        ret[key] = self.torrents[torrent_id][key] 

            else: 

                ret = self.torrents[torrent_id] 

            self.prev_status[torrent_id] = dict(self.torrents[torrent_id]) 

            return succeed(ret) 

 

    def get_torrents_status(self, filter_dict, keys, diff=False): 

48        if not filter_dict: 

            filter_dict["id"] = self.torrents.keys() 

50        if not keys: 

            keys = self.torrents["a"].keys() 

52        if not diff: 

            if "id" in filter_dict: 

                torrents = filter_dict["id"] 

                ret = {} 

                for torrent in torrents: 

                    ret[torrent] = {} 

                    for key in keys: 

                        ret[torrent][key] = self.torrents[torrent][key] 

                return succeed(ret) 

        else: 

exit            if "id" in filter_dict: 

                torrents = filter_dict["id"] 

                ret = {} 

                for torrent in torrents: 

                    ret[torrent] = {} 

                    if torrent in self.prev_status: 

                        for key in self.prev_status[torrent]: 

                            if self.prev_status[torrent][key] != self.torrents[torrent][key]: 

                                ret[torrent][key] = self.torrents[torrent][key] 

                    else: 

                        ret[torrent] = dict(self.torrents[torrent]) 

 

                    self.prev_status[torrent] = dict(self.torrents[torrent]) 

                return succeed(ret) 

 

 

class Client(object): 

    def __init__(self): 

        self.core = Core() 

 

    def __noop__(self, *args, **kwargs): 

        return None 

 

    def __getattr__(self, *args, **kwargs): 

        return self.__noop__ 

 

client = Client() 

 

deluge.ui.sessionproxy.client = client 

 

 

class SessionProxyTestCase(unittest.TestCase): 

    def setUp(self):  # NOQA 

        self.sp = deluge.ui.sessionproxy.SessionProxy() 

        client.core.reset() 

        d = self.sp.start() 

 

        def do_get_torrents_status(torrent_ids): 

            inital_keys = ['key1'] 

            self.sp.get_torrents_status({'id': torrent_ids}, inital_keys) 

        d.addCallback(do_get_torrents_status) 

        return d 

 

    def tearDown(self):  # NOQA 

        return component.deregister(self.sp) 

 

    def test_startup(self): 

        self.assertEquals(client.core.torrents["a"], self.sp.torrents["a"][1]) 

 

    def test_get_torrent_status_no_change(self): 

        d = self.sp.get_torrent_status("a", []) 

        d.addCallback(self.assertEquals, client.core.torrents["a"]) 

        return d 

 

    def test_get_torrent_status_change_with_cache(self): 

        client.core.torrents["a"]["key1"] = 2 

        d = self.sp.get_torrent_status("a", ["key1"]) 

        d.addCallback(self.assertEquals, {"key1": 1}) 

        return d 

 

    def test_get_torrent_status_change_without_cache(self): 

        client.core.torrents["a"]["key1"] = 2 

        time.sleep(self.sp.cache_time + 0.1) 

        d = self.sp.get_torrent_status("a", []) 

        d.addCallback(self.assertEquals, client.core.torrents["a"]) 

        return d 

 

    def test_get_torrent_status_key_not_updated(self): 

        time.sleep(self.sp.cache_time + 0.1) 

        self.sp.get_torrent_status("a", ["key1"]) 

        client.core.torrents["a"]["key2"] = 99 

        d = self.sp.get_torrent_status("a", ["key2"]) 

        d.addCallback(self.assertEquals, {"key2": 99}) 

        return d 

 

    def test_get_torrents_status_key_not_updated(self): 

        time.sleep(self.sp.cache_time + 0.1) 

        self.sp.get_torrents_status({"id": ["a"]}, ["key1"]) 

        client.core.torrents["a"]["key2"] = 99 

        d = self.sp.get_torrents_status({"id": ["a"]}, ["key2"]) 

        d.addCallback(self.assertEquals, {"a": {"key2": 99}}) 

        return d