篇首语:本文由编程笔记#小编为大家整理,主要介绍了ruby 状态模式灯泡相关的知识,希望对你有一定的参考价值。
class WebSocketState(object):
@staticmethod
def factory(state):
if state == "new":
return WebSocketNewState()
if state == "ssl_handshake":
return WebSocketSSLHandshakeState()
if state == "headers":
return WebSocketHeadersState()
if state == "send_headers":
return WebSocketSendHeadersState()
if state == "response":
return WebSocketResponseState()
if state == "done":
return WebSocketDoneState()
if state == "flush":
return WebSocketFlushState()
class WebSocketNewState(WebSocketState):
@property
def state(self):
return "new"
@staticmethod
def connect(web_socket, uri, origin=None, protocols=[]):
web_socket.client = True
uri = urlparse(uri)
port = uri.port
if uri.scheme in ("ws", "http"):
if not port:
port = 80
elif uri.scheme in ("wss", "https"):
if not port:
port = 443
else:
raise Exception("Unknown scheme '%s'" % uri.scheme)
web_socket.socket = socket.create_connection((uri.hostname, port))
if uri.scheme in ("wss", "https"):
web_socket.socket = ssl.wrap_socket(web_socket.socket)
web_socket.state = "ssl_handshake"
else:
web_socket.state = "headers"
class WebSocketSSLHandshakeState(WebSocketState):
@property
def state(self):
return "ssl_handshake"
@staticmethod
def connect(web_socket, uri, origin=None, protocols=[]):
web_socket.socket.do_handshake()
web_socket.state = "headers"
class WebSocketHeadersState(WebSocketState):
@property
def state(self):
return "headers"
@staticmethod
def connect(web_socket, uri, origin=None, protocols=[]):
web_socket._key = ''
for i in range(16):
web_socket._key += chr(random.randrange(256))
if sys.hexversion >= 0x3000000:
web_socket._key = bytes(web_socket._key, "latin-1")
web_socket._key = b64encode(web_socket._key).decode("ascii")
path = uri.path
if not path:
path = "/"
web_socket._queue_str("GET %s HTTP/1.1\r\n" % path)
web_socket._queue_str("Host: %s\r\n" % uri.hostname)
web_socket._queue_str("Upgrade: websocket\r\n")
web_socket._queue_str("Connection: upgrade\r\n")
web_socket._queue_str("Sec-WebSocket-Key: %s\r\n" % web_socket._key)
web_socket._queue_str("Sec-WebSocket-Version: 13\r\n")
if origin is not None:
web_socket._queue_str("Origin: %s\r\n" % origin)
if len(protocols) > 0:
web_socket._queue_str(
"Sec-WebSocket-Protocol: %s\r\n" % ", ".join(protocols))
web_socket._queue_str("\r\n")
web_socket.state = "send_headers"
class WebSocketSendHeadersState(WebSocketState):
@property
def state(self):
return "send_headers"
@staticmethod
def connect(web_socket, uri, origin=None, protocols=[]):
web_socket._flush()
web_socket.state = "response"
class WebSocketResponseState(WebSocketState):
@property
def state(self):
return "response"
@staticmethod
def connect(web_socket, uri, origin=None, protocols=[]):
if not web_socket._recv():
raise Exception("Socket closed unexpectedly")
if web_socket._recv_buffer.find('\r\n\r\n'.encode("ascii")) == -1:
raise WebSocketWantReadError
(request, web_socket._recv_buffer) = web_socket._recv_buffer.split(
'\r\n'.encode("ascii"), 1)
request = request.decode("latin-1")
words = request.split()
if (len(words) <2) or (words[0] != "HTTP/1.1"):
raise Exception("Invalid response")
if words[1] != "101":
raise Exception("WebSocket request denied: %s" %
" ".join(words[1:]))
(headers, web_socket._recv_buffer) = web_socket._recv_buffer.split(
&#39;\r\n\r\n&#39;.encode("ascii"), 1)
headers = headers.decode(&#39;latin-1&#39;) + &#39;\r\n&#39;
headers = email.message_from_string(headers)
if headers.get("Upgrade", "").lower() != "websocket":
print(type(headers))
raise Exception("Missing or incorrect upgrade header")
accept = headers.get(&#39;Sec-WebSocket-Accept&#39;)
if accept is None:
raise Exception("Missing Sec-WebSocket-Accept header")
expected = sha1(
(web_socket._key + web_socket.GUID).encode("ascii")).digest()
expected = b64encode(expected).decode("ascii")
del web_socket._key
if accept != expected:
raise Exception("Invalid Sec-WebSocket-Accept header")
web_socket.protocol = headers.get(&#39;Sec-WebSocket-Protocol&#39;)
if len(protocols) == 0:
if web_socket.protocol is not None:
raise Exception("Unexpected Sec-WebSocket-Protocol header")
else:
if web_socket.protocol not in protocols:
raise Exception("Invalid protocol chosen by server")
web_socket.state = "done"
class WebSocketDoneState(WebSocketState):
@property
def state(self):
return "done"
@staticmethod
def connect(web_socket, uri, origin=None, protocols=[]):
raise Exception("WebSocket is in an invalid state")
class WebSocketFlushState(WebSocketState):
@property
def state(self):
return "flush"
@staticmethod
def connect(web_socket, uri, origin=None, protocols=[]):
raise Exception("WebSocket is in an invalid state")
class WebSocket(object):
@property
def state_obj(self):
return self._state
@property
def state(self):
return self._state.state
#
@state.setter
def state(self, value):
self._state = WebSocketState.factory(value)
#
@state.deleter
def state(self): # again, name must be the same
del self._state
def connect(self, uri, origin=None, protocols=[]):
self.state_obj.connect(self, uri, origin=None, protocols=[])
class WebSocket(object):
def connect(self, uri, origin=None, protocols=[]):
"""Establishes a new connection to a WebSocket server.
This method connects to the host specified by uri and
negotiates a WebSocket connection. origin should be specified
in accordance with RFC 6454 if known. A list of valid
sub-protocols can be specified in the protocols argument.
The data will be sent in the clear if the "ws" scheme is used,
and encrypted if the "wss" scheme is used.
Both WebSocketWantReadError and WebSocketWantWriteError can be
raised whilst negotiating the connection. Repeated calls to
connect() must retain the same arguments.
"""
self.client = True;
uri = urlparse(uri)
port = uri.port
if uri.scheme in ("ws", "http"):
if not port:
port = 80
elif uri.scheme in ("wss", "https"):
if not port:
port = 443
else:
raise Exception("Unknown scheme &#39;%s&#39;" % uri.scheme)
# This is a state machine in order to handle
# WantRead/WantWrite events
if self._state == "new":
self.socket = socket.create_connection((uri.hostname, port))
if uri.scheme in ("wss", "https"):
self.socket = ssl.wrap_socket(self.socket)
self._state = "ssl_handshake"
else:
self._state = "headers"
if self._state == "ssl_handshake":
self.socket.do_handshake()
self._state = "headers"
if self._state == "headers":
self._key = &#39;&#39;
for i in range(16):
self._key += chr(random.randrange(256))
if sys.hexversion >= 0x3000000:
self._key = bytes(self._key, "latin-1")
self._key = b64encode(self._key).decode("ascii")
path = uri.path
if not path:
path = "/"
self._queue_str("GET %s HTTP/1.1\r\n" % path)
self._queue_str("Host: %s\r\n" % uri.hostname)
self._queue_str("Upgrade: websocket\r\n")
self._queue_str("Connection: upgrade\r\n")
self._queue_str("Sec-WebSocket-Key: %s\r\n" % self._key)
self._queue_str("Sec-WebSocket-Version: 13\r\n")
if origin is not None:
self._queue_str("Origin: %s\r\n" % origin)
if len(protocols) > 0:
self._queue_str("Sec-WebSocket-Protocol: %s\r\n" % ", ".join(protocols))
self._queue_str("\r\n")
self._state = "send_headers"
if self._state == "send_headers":
self._flush()
self._state = "response"
if self._state == "response":
if not self._recv():
raise Exception("Socket closed unexpectedly")
if self._recv_buffer.find(&#39;\r\n\r\n&#39;.encode("ascii")) == -1:
raise WebSocketWantReadError
(request, self._recv_buffer) = self._recv_buffer.split(&#39;\r\n&#39;.encode("ascii"), 1)
request = request.decode("latin-1")
words = request.split()
if (len(words) <2) or (words[0] != "HTTP/1.1"):
raise Exception("Invalid response")
if words[1] != "101":
raise Exception("WebSocket request denied: %s" % " ".join(words[1:]))
(headers, self._recv_buffer) = self._recv_buffer.split(&#39;\r\n\r\n&#39;.encode("ascii"), 1)
headers = headers.decode(&#39;latin-1&#39;) + &#39;\r\n&#39;
headers = email.message_from_string(headers)
if headers.get("Upgrade", "").lower() != "websocket":
print(type(headers))
raise Exception("Missing or incorrect upgrade header")
accept = headers.get(&#39;Sec-WebSocket-Accept&#39;)
if accept is None:
raise Exception("Missing Sec-WebSocket-Accept header");
expected = sha1((self._key + self.GUID).encode("ascii")).digest()
expected = b64encode(expected).decode("ascii")
del self._key
if accept != expected:
raise Exception("Invalid Sec-WebSocket-Accept header");
self.protocol = headers.get(&#39;Sec-WebSocket-Protocol&#39;)
if len(protocols) == 0:
if self.protocol is not None:
raise Exception("Unexpected Sec-WebSocket-Protocol header")
else:
if self.protocol not in protocols:
raise Exception("Invalid protocol chosen by server")
self._state = "done"
return
raise Exception("WebSocket is in an invalid state")
# Add in the DimmedState
class LightBulb
def initialize
self.state = :off
end
def turn_on
state.turn_on(self)
end
def turn_off
state.turn_off(self)
end
def dim
state.dim(self)
end
def state=(state)
@state = LightBulbState.for(state)
end
def state
@state
end
end
class LightBulbState
def self.for(state)
if state == :on
LightBulbOnState
elsif state == :off
LightBulbOffState
elsif state == :dimmed
LightBulbDimmedState
end.new
end
end
class LightBulbOnState
def initialize
@state = :on
end
def turn_on(light_bulb)
false
end
def turn_off(light_bulb)
light_bulb.state = :off
true
end
def dim(light_bulb)
light_bulb.state = :dimmed
true
end
end
class LightBulbOffState
def initialize
@state = :off
end
def turn_on(light_bulb)
light_bulb.state = :on
true
end
def turn_off(light_bulb)
false
end
def dim(light_bulb)
false
end
end
class LightBulbDimmedState
def initialize
@state = :dimmed
end
def turn_on(light_bulb)
light_bulb.state = :on
true
end
def turn_off(light_bulb)
light_bulb.state = :off
true
end
def dim(light_bulb)
light_bulb.state = :on
true
end
end
# Have state getter return the state object and delegate to that
class LightBulb
def initialize
self.state = :off
end
def turn_on
state.turn_on(self)
end
def turn_off
state.turn_off(self)
end
def state=(state)
@state = LightBulbState.for(state)
end
def state
@state
end
end
class LightBulbState
def self.for(state)
if state == :on
LightBulbOnState
elsif state == :off
LightBulbOffState
end.new
end
end
class LightBulbOnState
def initialize
@state = :on
end
def turn_on(light_bulb)
false
end
def turn_off(light_bulb)
light_bulb.state = :off
true
end
end
class LightBulbOffState
def initialize
@state = :off
end
def turn_on(light_bulb)
light_bulb.state = :on
true
end
def turn_off(light_bulb)
false
end
end
# Delete Context&#39;s method bodies and leave delegation
class LightBulb
def initialize
self.state = :off
end
def turn_on
LightBulbState.for(state).turn_on(self)
end
def turn_off
LightBulbState.for(state).turn_off(self)
end
def state=(state)
@state = LightBulbState.for(state)
end
def state
@state.state
end
end
class LightBulbState
def self.for(state)
if state == :on
LightBulbOnState
elsif state == :off
LightBulbOffState
end.new
end
end
class LightBulbOnState
def initialize
@state = :on
end
def turn_on(light_bulb)
false
end
def turn_off(light_bulb)
light_bulb.state = :off
true
end
end
class LightBulbOffState
def initialize
@state = :off
end
def turn_on(light_bulb)
light_bulb.state = :on
true
end
def turn_off(light_bulb)
false
end
end
# delegate to State&#39;s method from Context&#39;s methods
class LightBulb
def initialize
self.state = :off
end
def turn_on
return LightBulbState.for(state).turn_on(self)
if state == :on
false
else
self.state = :on
true
end
end
def turn_off
return LightBulbState.for(state).turn_off(self)
if state == :off
false
else
self.state = :off
true
end
end
def state=(state)
@state = LightBulbState.for(state)
end
def state
@state.state
end
end
class LightBulbState
def self.for(state)
if state == :on
LightBulbOnState
elsif state == :off
LightBulbOffState
end.new
end
end
class LightBulbOnState
def initialize
@state = :on
end
def turn_on(light_bulb)
false
end
def turn_off(light_bulb)
light_bulb.state = :off
true
end
end
class LightBulbOffState
def initialize
@state = :off
end
def turn_on(light_bulb)
light_bulb.state = :on
true
end
def turn_off(light_bulb)
false
end
end
# Delete all except the required branch for each state
class LightBulb
def initialize
self.state = :off
end
def turn_on
if state == :on
false
else
self.state = :on
true
end
end
def turn_off
if state == :off
false
else
self.state = :off
true
end
end
def state=(state)
@state = LightBulbState.for(state)
end
def state
@state.state
end
end
class LightBulbState
def self.for(state)
if state == :on
LightBulbOnState
elsif state == :off
LightBulbOffState
end.new
end
end
class LightBulbOnState
def initialize
@state = :on
end
def turn_on(light_bulb)
false
end
def turn_off(light_bulb)
light_bulb.state = :off
true
end
end
class LightBulbOffState
def initialize
@state = :off
end
def turn_on(light_bulb)
light_bulb.state = :on
true
end
def turn_off(light_bulb)
false
end
end
# Copy all of the code from the abstract state class into the concrete state
class LightBulb
def initialize
self.state = :off
end
def turn_on
if state == :on
false
else
self.state = :on
true
end
end
def turn_off
if state == :off
false
else
self.state = :off
true
end
end
def state=(state)
@state = LightBulbState.for(state)
end
def state
@state.state
end
end
class LightBulbState
def self.for(state)
if state == :on
LightBulbOnState
elsif state == :off
LightBulbOffState
end.new
end
def turn_on(light_bulb)
if light_bulb.state == :on
false
else
light_bulb.state = :on
true
end
end
def turn_off(light_bulb)
if light_bulb.state == :off
false
else
light_bulb.state = :off
true
end
end
end
class LightBulbOnState
def initialize
@state = :on
end
def turn_on(light_bulb)
if light_bulb.state == :on
false
else
light_bulb.state = :on
true
end
end
def turn_off(light_bulb)
if light_bulb.state == :off
false
else
light_bulb.state = :off
true
end
end
end
class LightBulbOffState
def initialize
@state = :off
end
def turn_on(light_bulb)
if light_bulb.state == :on
false
else
light_bulb.state = :on
true
end
end
def turn_off(light_bulb)
if light_bulb.state == :off
false
else
light_bulb.state = :off
true
end
end
end
# Allow method to except a LightBulb instance as a parameter
class LightBulb
def initialize
self.state = :off
end
def turn_on
if state == :on
false
else
self.state = :on
true
end
end
def turn_off
if state == :off
false
else
self.state = :off
true
end
end
def state=(state)
@state = LightBulbState.for(state)
end
def state
@state.state
end
end
class LightBulbState
def self.for(state)
if state == :on
LightBulbOnState
elsif state == :off
LightBulbOffState
end.new
end
def turn_on(light_bulb)
if light_bulb.state == :on
false
else
light_bulb.state = :on
true
end
end
def turn_off(light_bulb)
if light_bulb.state == :off
false
else
light_bulb.state = :off
true
end
end
end
class LightBulbOnState
def initialize
@state = :on
end
end
class LightBulbOffState
def initialize
@state = :off
end
end
# Move methods from context into abstract class
class LightBulb
def initialize
self.state = :off
end
def turn_on
if state == :on
false
else
self.state = :on
true
end
end
def turn_off
if state == :off
false
else
self.state = :off
true
end
end
def state=(state)
@state = LightBulbState.for(state)
end
def state
@state.state
end
end
class LightBulbState
def self.for(state)
if state == :on
LightBulbOnState
elsif state == :off
LightBulbOffState
end.new
end
def turn_on
if state == :on
false
else
self.state = :on
true
end
end
def turn_off
if state == :off
false
else
self.state = :off
true
end
end
end
class LightBulbOnState
def initialize
@state = :on
end
end
class LightBulbOffState
def initialize
@state = :off
end
end
# Remove the shim from the getter
class LightBulb
def initialize
self.state = :off
end
def turn_on
if state == :on
false
else
self.state = :on
true
end
end
def turn_off
if state == :off
false
else
self.state = :off
true
end
end
def state=(state)
@state = LightBulbState.for(state)
end
def state
@state.state
end
end
class LightBulbState
def self.for(state)
if state == :on
LightBulbOnState
elsif state == :off
LightBulbOffState
end.new
end
end
class LightBulbOnState
def initialize
@state = :on
end
end
class LightBulbOffState
def initialize
@state = :off
end
end
# Use new factory to instanciate appropriate state
class LightBulb
def initialize
self.state = :off
end
def turn_on
if state == :on
false
else
self.state = :on
true
end
end
def turn_off
if state == :off
false
else
self.state = :off
true
end
end
def state=(state)
@state = LightBulbState.for(state)
end
def state
begin
@state.state
rescue NoMethodError
@state
end
end
end
class LightBulbState
def self.for(state)
if state == :on
LightBulbOnState
elsif state == :off
LightBulbOffState
end.new
end
end
class LightBulbOnState
def initialize
@state = :on
end
end
class LightBulbOffState
def initialize
@state = :off
end
end
# Use a shim to keep the tests green
class LightBulb
def initialize
self.state = :off
end
def turn_on
if state == :on
false
else
self.state = :on
true
end
end
def turn_off
if state == :off
false
else
self.state = :off
true
end
end
def state=(state)
@state = state
end
def state
begin
@state.state
rescue NoMethodError
@state
end
end
end
class LightBulbState
def self.for(state)
if state == :on
LightBulbOnState
elsif state == :off
LightBulbOffState
end.new
end
end
class LightBulbOnState
def initialize
@state = :on
end
end
class LightBulbOffState
def initialize
@state = :off
end
end
# Inherit specific classes from abstract State class and assign getters
class LightBulb
def initialize
self.state = :off
end
def turn_on
if state == :on
false
else
self.state = :on
true
end
end
def turn_off
if state == :off
false
else
self.state = :off
true
end
end
def dim
if state == :off
false
elsif state == :dimmed
self.state = :on
true
else
self.state = :dimmed
true
end
end
def state=(state)
@state = state
end
def state
@state
end
end
class LightBulbState; end
class LightBulbOnState
def initialize
@state = :on
end
end
class LightBulbOffState
def initialize
@state = :off
end
end
# Define an abstract State class
class LightBulb
def initialize
self.state = :off
end
def turn_on
if state == :on
false
else
self.state = :on
true
end
end
def turn_off
if state == :off
false
else
self.state = :off
true
end
end
def state=(state)
@state = state
end
def state
@state
end
end
class LightBulbState; end
# Explicit getter/setter "Self Encapulating Field"
class LightBulb
def initialize
self.state = :off
end
def turn_on
if state == :on
false
else
self.state = :on
true
end
end
def turn_off
if state == :off
false
else
self.state = :off
true
end
end
def state=(state)
@state = state
end
def state
@state
end
end
# Shoving in dim()
class LightBulb
attr_reader :state
def initialize
self.state = :off
end
def turn_on
if state == :on
false
else
self.state = :on
true
end
end
def turn_off
if state == :off
false
else
self.state = :off
true
end
end
def dim
if state == :off
false
elsif state == :dimmed
self.state = :on
true
else
self.state = :dimmed
true
end
end
private
attr_writer :state
end
# Turn off twice returns false
class LightBulb
attr_reader :state
def initialize
self.state = :off
end
def turn_on
if state == :on
false
else
self.state = :on
true
end
end
def turn_off
if state == :off
false
else
self.state = :off
true
end
end
private
attr_writer :state
end
# Turn on twice returns false
class LightBulb
attr_reader :state
def initialize
self.state = :off
end
def turn_on
if state == :on
false
else
self.state = :on
true
end
end
def turn_off
self.state = :off
end
private
attr_writer :state
end
# Turn off LightBulb
class LightBulb
attr_reader :state
def initialize
self.state = :off
end
def turn_on
self.state = :on
end
def turn_off
self.state = :off
end
private
attr_writer :state
end
# Turn on LightBulb
class LightBulb
attr_reader :state
def initialize
self.state = :off
end
def turn_on
self.state = :on
end
private
attr_writer :state
end