Kucoin Announcements

This commit is contained in:
stefano 2021-11-16 00:32:06 +01:00
parent 995aeb1ff0
commit 89fa39b3bc
2 changed files with 185 additions and 25 deletions

View File

@ -6,8 +6,6 @@ from sqlalchemy import create_engine
from sqlalchemy.exc import ProgrammingError from sqlalchemy.exc import ProgrammingError
from sqlalchemy.types import DateTime from sqlalchemy.types import DateTime
TABLE = "binanceannouncements"
def get_engine(uri: str): def get_engine(uri: str):
return create_engine(uri, pool_recycle=3600) return create_engine(uri, pool_recycle=3600)
@ -17,12 +15,12 @@ def get_connection(uri: str):
return get_engine(uri).connect() return get_engine(uri).connect()
def get_df(uri): def get_df(uri, table_name):
"""Get dataframe and the first time create DB.""" """Get dataframe and the first time create DB."""
connection = get_connection(uri) connection = get_connection(uri)
try: try:
return pd.read_sql_table( return pd.read_sql_table(
table_name=TABLE, table_name=table_name,
con=connection, con=connection,
index_col='index', index_col='index',
columns=['Token', 'Text', 'Link', 'Datetime discover', 'Datetime announcement'], columns=['Token', 'Text', 'Link', 'Datetime discover', 'Datetime announcement'],
@ -33,12 +31,12 @@ def get_df(uri):
connection.close() connection.close()
def save_df(df, uri): def save_df(df, uri, table_name):
"""Save dataframe on DB.""" """Save dataframe on DB."""
connection = get_connection(uri) connection = get_connection(uri)
try: try:
df.to_sql( df.to_sql(
name=TABLE, name=table_name,
con=connection, con=connection,
index=True, index=True,
index_label='index', index_label='index',

View File

@ -78,13 +78,14 @@ class BinanceAnnouncement(AnnouncementMixin):
TOKEN_COL = 'Token' TOKEN_COL = 'Token'
ANNOUNCEMENT_COL = 'Datetime announcement' ANNOUNCEMENT_COL = 'Datetime announcement'
table_name = 'binanceannouncements'
_df: Optional[pd.DataFrame] = None _df: Optional[pd.DataFrame] = None
def __init__(self, refresh_period: Optional[int] = None, *args, **kwargs): def __init__(self, refresh_period: Optional[int] = None, *args, **kwargs):
self.db_path = kwargs.get('db_path', "user_data/data/BinanceAnnouncements_announcements.csv") self.db_path = kwargs.pop('db_path', "user_data/data/BinanceAnnouncements_announcements.csv")
self._refresh_period = refresh_period or self.REFRESH_PERIOD super().__init__(refresh_period, *args, **kwargs)
def update_announcements(self, page_number=1, page_size=10, history=False) -> pd.DataFrame: def update_announcements(self, page_number=1, page_size=10, max_page=100, history=False) -> pd.DataFrame:
headers = { headers = {
"Cache-Control": "max-age=0", "Cache-Control": "max-age=0",
} }
@ -109,7 +110,7 @@ class BinanceAnnouncement(AnnouncementMixin):
if not history: if not history:
while 'Age' in (response.headers or {}): while 'Age' in (response.headers or {}):
try: try:
url = self.get_api_url(random.randint(1, 100), page_size) url = self.get_api_url(random.randint(1, max_page), page_size)
response = get(url, headers=headers) response = get(url, headers=headers)
except Exception: except Exception:
break break
@ -212,12 +213,12 @@ class BinanceAnnouncement(AnnouncementMixin):
def load_db(self): def load_db(self):
try: try:
self._df = binanceannouncements.get_df(self.db_path) self._df = binanceannouncements.get_df(self.db_path, self.table_name)
except binanceannouncements.ProgrammingError: except binanceannouncements.ProgrammingError:
pass pass
def save_db(self): def save_db(self):
binanceannouncements.save_df(self._df, self.db_path) binanceannouncements.save_df(self._df, self.db_path, self.table_name)
def _save_df(self, df: pd.DataFrame): def _save_df(self, df: pd.DataFrame):
self._df = df.sort_values(by='Datetime announcement') self._df = df.sort_values(by='Datetime announcement')
@ -255,6 +256,168 @@ class BinanceAnnouncement(AnnouncementMixin):
return self.BINANCE_API_URL.format(self.BINANCE_CATALOG_ID, page_number, page_size) return self.BINANCE_API_URL.format(self.BINANCE_CATALOG_ID, page_number, page_size)
class KucoinAnnouncement(AnnouncementMixin):
CATALOG = 'listing'
BASE_URL = "https://www.kucoin.com/_api/cms/articles"
ANNOUNCEMENT_URL = 'https://www.kucoin.com/news/'
API_QUERY = "?page={}&pageSize={}&category={}&c=d3a4f4dcc33844fbbabc3f9cc0abb3cf&lang=en_US"
API_URL = BASE_URL + API_QUERY
# token info
TOKEN_REGEX = re.compile(r'\((\w+)\)')
# 'opens trading', 'defi', 'uniswap', 'airdrop'
KEY_WORDS_BLACKLIST = ['listing postponed', 'futures', 'leveraged']
REFRESH_PERIOD = 3
# storage
COLS = ['Token', 'Text', 'Link', 'Datetime discover', 'Datetime announcement']
TOKEN_COL = 'Token'
ANNOUNCEMENT_COL = 'Datetime announcement'
table_name = 'kucoinannouncements'
_df: Optional[pd.DataFrame] = None
def __init__(self, refresh_period: Optional[int] = None, *args, **kwargs):
self.db_path = kwargs.get('db_path', "user_data/data/KucoinAnnouncements_announcements.csv")
self._refresh_period = refresh_period or self.REFRESH_PERIOD
super().__init__(*args, **kwargs)
def update_announcements(self, page_number=1, page_size=10, max_page=100, history=False) -> pd.DataFrame:
headers = {
"Cache-Control": "max-age=0",
}
response = None
url = self.get_api_url(page_number, page_size)
if history:
# recursive updating
return [self.update_announcements(
page, page_size, history=False
) for page in reversed(range(2, 56))][-1]
try:
now = datetime.now(tz=pytz.utc)
df = self.get_df()
try:
response = get(url, headers=headers)
except Exception as e:
raise TemporaryError(f"Kucoin url ({url}) is not available. Original Exception: {e}")
if response.status_code != 200:
raise TemporaryError(f"Invalid response from url: {url}.\n"
f"Status code: {response.status_code}\n"
f"Content: {response.content.decode()}")
logger.info("Updating from Kucoin...")
updated_list = []
articles = response.json()['items'] or []
for article in articles:
article_link = self.get_announcement_url(article['path'])
article_text = article['title']
tokens = self._get_tokens(article_text)
for token in tokens:
if token:
updated_list.extend(
self._get_new_data(
data=article,
now=now,
token=token,
article_text_lower=article_text.lower(),
article_link=article_link,
article_text=article_text,
df=df
)
)
if df is not None:
df = df.append(pd.DataFrame(updated_list, columns=self.COLS), ignore_index=True)
else:
df = pd.DataFrame(updated_list, columns=self.COLS)
if updated_list:
logger.info(f"Adding tokens to database: {[upd[0] for upd in updated_list]}")
self._save_df(df)
return df
except TemporaryError as e:
# exception handled, re-raise
logger.error(e)
raise e
except Exception as e:
# exception not handled raise OperationalException
logger.error(e)
raise OperationalException(f"Some errors occurred processing KUCOIN data. "
f"Url: {url}.\n"
f"Status code: {response.status_code if response else None}\n"
f"Content: {response.content.decode() if response else None}\n"
f"Exception: {e}")
def _get_new_data(self, data, now, token, article_text_lower, article_link, article_text, df=None):
updated_list = []
conditions_buy = (
df is None # is first time data
or not (token is None or token in df[self.TOKEN_COL].values) # not an existing or null token
)
if conditions_buy:
if any(i in article_text_lower for i in self.KEY_WORDS_BLACKLIST):
logger.debug(f'BLACKLISTED: "{article_text}", skip.')
return updated_list
if token:
logger.info(f'Found new announcement: "{article_text}". Token: {token}.')
updated_list.append(
[token, article_text, article_link, now, self.get_datetime_announcement(data)]
)
return updated_list
def _get_tokens(self, text: str):
return self.TOKEN_REGEX.findall(text)
def get_df(self):
"""Get Dataframe from CSV file or from Database"""
if self._df is None:
self.load_csv_db() if self.db_path.endswith('csv') else self.load_db()
return self._df
def load_csv_db(self):
try:
self._df = pd.read_csv(self.db_path, parse_dates=['Datetime discover', 'Datetime announcement'])
except FileNotFoundError:
pass
def load_db(self):
try:
self._df = binanceannouncements.get_df(self.db_path, self.table_name)
except binanceannouncements.ProgrammingError:
pass
def save_db(self):
binanceannouncements.save_df(self._df, self.db_path, self.table_name)
def _save_df(self, df: pd.DataFrame):
self._df = df.sort_values(by='Datetime announcement')
self._df.to_csv(self.db_path, index=False) if self.db_path.endswith('csv') else self.save_db()
def get_datetime_announcement(self, data: dict):
return datetime.strptime(data['publish_at'], "YYYY-mm-dd HH:MM:SS")
def get_announcement_url(self, path: str) -> str:
# https://www.kucoin.com/news/en-aragon-ant-gets-listed-on-kucoin
return "".join([self.ANNOUNCEMENT_URL, path])
def get_api_url(self, page_number: int = 1, page_size: int = 10) -> str:
return self.API_URL.format(page_number, page_size, self.CATALOG)
class AnnouncementsPairList(IPairList): class AnnouncementsPairList(IPairList):
""" """
Announcement pair list. Announcement pair list.
@ -352,21 +515,20 @@ class AnnouncementsPairList(IPairList):
return [f"{token}/{self._stake_currency}" for token in df[self.pair_exchange.TOKEN_COL]] return [f"{token}/{self._stake_currency}" for token in df[self.pair_exchange.TOKEN_COL]]
def _init_pair_exchange(self, config, **pair_exchange_kwargs): def _init_pair_exchange(self, config, **pair_exchange_kwargs):
exchange = None pair_exchange_kwargs['db_path'] = config.get('ba_database_uri', None)
pair_exchange_kwargs['refresh_period'] = self._refresh_period
if self._pair_exchange == 'binance': if self._pair_exchange == 'binance':
exchange = BinanceAnnouncement( exchange = BinanceAnnouncement(**pair_exchange_kwargs)
db_path=config.get('ba_database_uri', None), elif self._pair_exchange == 'kucoin':
refresh_period=self._refresh_period, exchange = KucoinAnnouncement(**pair_exchange_kwargs)
**pair_exchange_kwargs else:
) raise OperationalException(f'Exchange `{self._pair_exchange}` is not supported yet')
if exchange: assert hasattr(exchange, 'update_announcements'), '`update_announcements` method is required'
assert hasattr(exchange, 'update_announcements'), '`update_announcements` method is required' assert hasattr(exchange, 'TOKEN_COL'), '`TOKEN_COL` attribute is required'
assert hasattr(exchange, 'TOKEN_COL'), '`TOKEN_COL` attribute is required' assert hasattr(exchange, 'ANNOUNCEMENT_COL'), '`ANNOUNCEMENT_COL` attribute is required'
assert hasattr(exchange, 'ANNOUNCEMENT_COL'), '`ANNOUNCEMENT_COL` attribute is required' return exchange
return exchange
raise OperationalException(f'Exchange `{self._pair_exchange}` is not supported yet')
def _validate_pair(self, pair: str, ticker: Dict[str, Any]) -> bool: def _validate_pair(self, pair: str, ticker: Dict[str, Any]) -> bool:
return NotImplemented return NotImplemented